diff --git a/src/main/java/redis/clients/jedis/CommandObjects.java b/src/main/java/redis/clients/jedis/CommandObjects.java index cbf0e19723..5c6af322dc 100644 --- a/src/main/java/redis/clients/jedis/CommandObjects.java +++ b/src/main/java/redis/clients/jedis/CommandObjects.java @@ -5,6 +5,8 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import java.util.stream.Collectors; import org.json.JSONArray; @@ -50,6 +52,7 @@ protected RedisProtocol getProtocol() { return protocol; } + private Lock mapperLock = new ReentrantLock(true); private volatile JsonObjectMapper jsonObjectMapper; private final AtomicInteger searchDialect = new AtomicInteger(0); @@ -4435,11 +4438,15 @@ public final CommandObject tFunctionCallAsync(String library, String fun private JsonObjectMapper getJsonObjectMapper() { JsonObjectMapper localRef = this.jsonObjectMapper; if (Objects.isNull(localRef)) { - synchronized (this) { + mapperLock.lock(); + + try { localRef = this.jsonObjectMapper; if (Objects.isNull(localRef)) { this.jsonObjectMapper = localRef = new DefaultGsonObjectMapper(); } + } finally { + mapperLock.unlock(); } } return localRef; diff --git a/src/main/java/redis/clients/jedis/JedisSentinelPool.java b/src/main/java/redis/clients/jedis/JedisSentinelPool.java index 586750540c..f6a9ea705d 100644 --- a/src/main/java/redis/clients/jedis/JedisSentinelPool.java +++ b/src/main/java/redis/clients/jedis/JedisSentinelPool.java @@ -6,6 +6,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -28,7 +30,7 @@ public class JedisSentinelPool extends Pool { private volatile HostAndPort currentHostMaster; - private final Object initPoolLock = new Object(); + private final Lock initPoolLock = new ReentrantLock(true); public JedisSentinelPool(String masterName, Set sentinels, final JedisClientConfig masterClientConfig, final JedisClientConfig sentinelClientConfig) { @@ -213,7 +215,9 @@ public HostAndPort getCurrentHostMaster() { } private void initMaster(HostAndPort master) { - synchronized (initPoolLock) { + initPoolLock.lock(); + + try { if (!master.equals(currentHostMaster)) { currentHostMaster = master; factory.setHostAndPort(currentHostMaster); @@ -223,6 +227,8 @@ private void initMaster(HostAndPort master) { LOG.info("Created JedisSentinelPool to master at {}", master); } + } finally { + initPoolLock.unlock(); } } diff --git a/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java b/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java index a9a49c9081..7496e6e928 100644 --- a/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java +++ b/src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java @@ -9,6 +9,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import redis.clients.jedis.Builder; @@ -106,9 +108,7 @@ private Builder getBuilder(String graphName) { } private void createBuilder(String graphName) { - synchronized (builders) { - builders.putIfAbsent(graphName, new ResultSetBuilder(new GraphCacheImpl(graphName))); - } + builders.computeIfAbsent(graphName, graphNameKey -> new ResultSetBuilder(new GraphCacheImpl(graphNameKey))); } private class GraphCacheImpl implements GraphCache { @@ -144,6 +144,8 @@ private class GraphCacheList { private final String name; private final String query; private final List data = new CopyOnWriteArrayList<>(); + + private final Lock dataLock = new ReentrantLock(true); /** * @@ -164,14 +166,18 @@ public GraphCacheList(String name, String procedure) { */ public String getCachedData(int index) { if (index >= data.size()) { - synchronized (data) { + dataLock.lock(); + + try { if (index >= data.size()) { getProcedureInfo(); } + } finally { + dataLock.unlock(); } } + return data.get(index); - } /** diff --git a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java index 4ef383e649..91ad29d9a0 100644 --- a/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java +++ b/src/main/java/redis/clients/jedis/mcf/CircuitBreakerFailoverBase.java @@ -1,6 +1,8 @@ package redis.clients.jedis.mcf; import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import redis.clients.jedis.annots.Experimental; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.providers.MultiClusterPooledConnectionProvider; @@ -17,6 +19,7 @@ */ @Experimental public class CircuitBreakerFailoverBase implements AutoCloseable { + private final Lock lock = new ReentrantLock(true); protected final MultiClusterPooledConnectionProvider provider; @@ -32,29 +35,34 @@ public void close() { /** * Functional interface wrapped in retry and circuit breaker logic to handle open circuit breaker failure scenarios */ - protected synchronized void clusterFailover(CircuitBreaker circuitBreaker) { + protected void clusterFailover(CircuitBreaker circuitBreaker) { + lock.lock(); + + try { + // Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent + if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) { - // Check state to handle race conditions since incrementActiveMultiClusterIndex() is non-idempotent - if (!CircuitBreaker.State.FORCED_OPEN.equals(circuitBreaker.getState())) { + // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. + // To recover/transition from this forced state the user will need to manually failback + circuitBreaker.transitionToForcedOpenState(); - // Transitions state machine to a FORCED_OPEN state, stopping state transition, metrics and event publishing. - // To recover/transition from this forced state the user will need to manually failback - circuitBreaker.transitionToForcedOpenState(); + // Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand() + // to use the next cluster's connection pool - according to the configuration's prioritization/order + int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex(); - // Incrementing the activeMultiClusterIndex will allow subsequent calls to the executeCommand() - // to use the next cluster's connection pool - according to the configuration's prioritization/order - int activeMultiClusterIndex = provider.incrementActiveMultiClusterIndex(); + // Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging + provider.runClusterFailoverPostProcessor(activeMultiClusterIndex); + } - // Implementation is optionally provided during configuration. Typically, used for activeMultiClusterIndex persistence or custom logging - provider.runClusterFailoverPostProcessor(activeMultiClusterIndex); - } - - // Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail - else if (provider.isLastClusterCircuitBreakerForcedOpen()) { - throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " + - "provided with an additional cluster/database endpoint according to its prioritized sequence. " + - "If applicable, consider failing back OR restarting with an available cluster/database endpoint"); + // Once the priority list is exhausted only a manual failback can open the circuit breaker so all subsequent operations will fail + else if (provider.isLastClusterCircuitBreakerForcedOpen()) { + throw new JedisConnectionException("Cluster/database endpoint could not failover since the MultiClusterClientConfig was not " + + "provided with an additional cluster/database endpoint according to its prioritized sequence. " + + "If applicable, consider failing back OR restarting with an available cluster/database endpoint"); + } + } finally { + lock.unlock(); } } -} \ No newline at end of file +} diff --git a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java index 47b03c7773..9ddf8e810a 100644 --- a/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/MultiClusterPooledConnectionProvider.java @@ -13,6 +13,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; @@ -54,6 +56,8 @@ public class MultiClusterPooledConnectionProvider implements ConnectionProvider * provided at startup via the MultiClusterClientConfig. All traffic will be routed according to this index. */ private volatile Integer activeMultiClusterIndex = 1; + + private final Lock activeClusterIndexLock = new ReentrantLock(true); /** * Indicates the final cluster/database endpoint (connection pool), according to the pre-configured list @@ -162,8 +166,9 @@ public int incrementActiveMultiClusterIndex() { // Field-level synchronization is used to avoid the edge case in which // setActiveMultiClusterIndex(int multiClusterIndex) is called at the same time - synchronized (activeMultiClusterIndex) { - + activeClusterIndexLock.lock(); + + try { String originalClusterName = getClusterCircuitBreaker().getName(); // Only increment if it can pass this validation otherwise we will need to check for NULL in the data path @@ -185,6 +190,8 @@ public int incrementActiveMultiClusterIndex() { incrementActiveMultiClusterIndex(); else log.warn("Cluster/database endpoint successfully updated from '{}' to '{}'", originalClusterName, circuitBreaker.getName()); + } finally { + activeClusterIndexLock.unlock(); } return activeMultiClusterIndex; @@ -229,11 +236,13 @@ public void validateTargetConnection(int multiClusterIndex) { * Special care should be taken to confirm cluster/database availability AND * potentially cross-cluster replication BEFORE using this capability. */ - public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) { + public void setActiveMultiClusterIndex(int multiClusterIndex) { // Field-level synchronization is used to avoid the edge case in which // incrementActiveMultiClusterIndex() is called at the same time - synchronized (activeMultiClusterIndex) { + activeClusterIndexLock.lock(); + + try { // Allows an attempt to reset the current cluster from a FORCED_OPEN to CLOSED state in the event that no failover is possible if (activeMultiClusterIndex == multiClusterIndex && @@ -256,6 +265,8 @@ public synchronized void setActiveMultiClusterIndex(int multiClusterIndex) { activeMultiClusterIndex = multiClusterIndex; lastClusterCircuitBreakerForcedOpen = false; + } finally { + activeClusterIndexLock.unlock(); } } diff --git a/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java b/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java index 5058f07179..f2f0746460 100644 --- a/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java +++ b/src/main/java/redis/clients/jedis/providers/SentineledConnectionProvider.java @@ -5,6 +5,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; @@ -43,7 +45,7 @@ public class SentineledConnectionProvider implements ConnectionProvider { private final long subscribeRetryWaitTimeMillis; - private final Object initPoolLock = new Object(); + private final Lock initPoolLock = new ReentrantLock(true); public SentineledConnectionProvider(String masterName, final JedisClientConfig masterClientConfig, Set sentinels, final JedisClientConfig sentinelClientConfig) { @@ -95,7 +97,9 @@ public HostAndPort getCurrentMaster() { } private void initMaster(HostAndPort master) { - synchronized (initPoolLock) { + initPoolLock.lock(); + + try { if (!master.equals(currentMaster)) { currentMaster = master; @@ -114,6 +118,8 @@ private void initMaster(HostAndPort master) { existingPool.close(); } } + } finally { + initPoolLock.unlock(); } }