Skip to content

Commit

Permalink
Fix blocking bug in RedisCache.retrieve(..) when waiting on the cache…
Browse files Browse the repository at this point in the history
… lock.

Closes spring-projects#2650
  • Loading branch information
jxblum committed Sep 25, 2023
1 parent b587361 commit c1e0745
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,17 @@ private BiFunction<byte[], Duration, Mono<byte[]>> reactiveExecutionStrategy(Str

ByteBuffer wrappedKey = ByteBuffer.wrap(key);

Mono<ByteBuffer> result = shouldExpireWithin(ttl)
// Do the same lock check as the regular Cache.get(key); be careful of blocking!
Mono<ByteBuffer> cacheLockCheckMono = Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
executeLockFree(connection -> checkAndPotentiallyWaitUntilUnlocked(cacheName, connection));
return ByteBuffer.wrap(new byte[0]);
}));

Mono<ByteBuffer> getMono = shouldExpireWithin(ttl)
? executeReactively(connection -> connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl)))
: executeReactively(connection -> connection.stringCommands().get(wrappedKey));

// Do the same lock check as the regular Cache.get(key); be careful of blocking!
result = result.doFirst(() -> executeLockFree(connection ->
checkAndPotentiallyWaitUntilUnlocked(cacheName, connection)));
Mono<ByteBuffer> result = cacheLockCheckMono.then(getMono);

@SuppressWarnings("all")
Mono<byte[]> byteArrayResult = result.map(DefaultRedisCacheWriter::nullSafeGetBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.awaitility.Awaitility.await;

import io.netty.util.concurrent.DefaultThreadFactory;

import reactor.core.publisher.Mono;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
Expand All @@ -48,6 +44,7 @@
import java.util.stream.IntStream;

import org.junit.jupiter.api.BeforeEach;

import org.springframework.cache.Cache.ValueWrapper;
import org.springframework.cache.interceptor.SimpleKey;
import org.springframework.cache.interceptor.SimpleKeyGenerator;
Expand All @@ -62,6 +59,10 @@
import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest;
import org.springframework.lang.Nullable;

import io.netty.util.concurrent.DefaultThreadFactory;

import reactor.core.publisher.Mono;

/**
* Tests for {@link RedisCache} with {@link DefaultRedisCacheWriter} using different {@link RedisSerializer} and
* {@link RedisConnectionFactory} pairs.
Expand Down Expand Up @@ -569,9 +570,9 @@ void cacheGetWithTimeToIdleExpirationAfterEntryExpiresShouldReturnNull() {
assertThat(cache.get(this.cacheKey, Person.class)).isNull();
}

@ParameterizedRedisTest // Gh-2650
@ParameterizedRedisTest // GH-2650
@SuppressWarnings("unchecked")
void retrieveReturnsCachedValueCorrectly() throws Exception {
void retrieveReturnsCachedValue() throws Exception {

doWithConnection(connection -> connection.stringCommands().set(this.binaryCacheKey, this.binarySample));

Expand All @@ -581,10 +582,43 @@ void retrieveReturnsCachedValueCorrectly() throws Exception {

assertThat(value).isNotNull();
assertThat(value.get()).isEqualTo(this.sample);
assertThat(value).isDone();
}

@ParameterizedRedisTest // GH-2650
@SuppressWarnings("unchecked")
void retrieveReturnsCachedValueWhenLockIsReleased() throws Exception {

String mockValue = "MockValue";
String testValue = "TestValue";

byte[] binaryCacheValue = this.serializer.serialize(testValue);

doWithConnection(connection -> connection.stringCommands().set(this.binaryCacheKey, binaryCacheValue));

RedisCache cache = new RedisCache("cache", usingLockingRedisCacheWriter(Duration.ofMillis(5L)),
usingRedisCacheConfiguration());

RedisCacheWriter cacheWriter = cache.getCacheWriter();

assertThat(cacheWriter).isInstanceOf(DefaultRedisCacheWriter.class);

((DefaultRedisCacheWriter) cacheWriter).lock("cache");

CompletableFuture<String> value = (CompletableFuture<String>) cache.retrieve(this.key);

assertThat(value).isNotNull();
assertThat(value.getNow(mockValue)).isEqualTo(mockValue);
assertThat(value).isNotDone();

((DefaultRedisCacheWriter) cacheWriter).unlock("cache");

assertThat(value.get(15L, TimeUnit.MILLISECONDS)).isEqualTo(testValue);
assertThat(value).isDone();
}

@ParameterizedRedisTest // Gh-2650
void retrieveReturnsLoadedValueCorrectly() throws Exception {
@ParameterizedRedisTest // GH-2650
void retrieveReturnsLoadedValue() throws Exception {

RedisCache cache = new RedisCache("cache", usingLockingRedisCacheWriter(), usingRedisCacheConfiguration());

Expand All @@ -608,6 +642,7 @@ void retrieveReturnsLoadedValueCorrectly() throws Exception {
assertThat(loaded.get()).isFalse();
assertThat(value.get()).isEqualTo(jon);
assertThat(loaded.get()).isTrue();
assertThat(value).isDone();
}

private RedisCacheConfiguration usingRedisCacheConfiguration() {
Expand All @@ -629,6 +664,11 @@ private RedisCacheWriter usingLockingRedisCacheWriter() {
return RedisCacheWriter.lockingRedisCacheWriter(this.connectionFactory);
}

private RedisCacheWriter usingLockingRedisCacheWriter(Duration sleepTime) {
return RedisCacheWriter.lockingRedisCacheWriter(this.connectionFactory, sleepTime,
RedisCacheWriter.TtlFunction.persistent(), BatchStrategies.keys());
}

private RedisCacheWriter usingNonLockingRedisCacheWriter() {
return RedisCacheWriter.nonLockingRedisCacheWriter(this.connectionFactory);
}
Expand Down

0 comments on commit c1e0745

Please sign in to comment.