Skip to content

Commit

Permalink
Support asynchronous Cache.retrieve(…) in RedisCache.
Browse files Browse the repository at this point in the history
Closes #2650
Original pull request: #2717
  • Loading branch information
jxblum authored and mp911de committed Oct 13, 2023
1 parent 9bae67e commit a72c426
Show file tree
Hide file tree
Showing 7 changed files with 769 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,29 @@
*/
package org.springframework.data.redis.cache;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
* and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
Expand Down Expand Up @@ -114,8 +123,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
Assert.notNull(key, "Key must not be null");

byte[] result = shouldExpireWithin(ttl)
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
: execute(name, connection -> connection.stringCommands().get(key));
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
: execute(name, connection -> connection.stringCommands().get(key));

statistics.incGets(name);

Expand All @@ -128,6 +137,81 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
return result;
}

@Override
public boolean isRetrieveSupported() {
return isReactive();
}

@Override
public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl) {

Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");

CompletableFuture<byte[]> result = nonBlockingRetrieveFunction(name).apply(key, ttl);

result = result.thenApply(cachedValue -> {

statistics.incGets(name);

if (cachedValue != null) {
statistics.incHits(name);
} else {
statistics.incMisses(name);
}

return cachedValue;
});

return result;
}

private BiFunction<byte[], Duration, CompletableFuture<byte[]>> nonBlockingRetrieveFunction(String cacheName) {
return isReactive() ? reactiveRetrieveFunction(cacheName) : asyncRetrieveFunction(cacheName);
}

// TODO: Possibly remove if we rely on the default Cache.retrieve(..) behavior
// after assessing RedisCacheWriter.isRetrieveSupported().
// Function applied for Cache.retrieve(key) when a non-reactive Redis driver is used, such as Jedis.
private BiFunction<byte[], Duration, CompletableFuture<byte[]>> asyncRetrieveFunction(String cacheName) {

return (key, ttl) -> {

Supplier<byte[]> getKey = () -> execute(cacheName, connection -> connection.stringCommands().get(key));

Supplier<byte[]> getKeyWithExpiration = () -> execute(cacheName, connection ->
connection.stringCommands().getEx(key, Expiration.from(ttl)));

return shouldExpireWithin(ttl)
? CompletableFuture.supplyAsync(getKeyWithExpiration)
: CompletableFuture.supplyAsync(getKey);

};
}

// Function applied for Cache.retrieve(key) when a reactive Redis driver is used, such as Lettuce.
private BiFunction<byte[], Duration, CompletableFuture<byte[]>> reactiveRetrieveFunction(String cacheName) {

return (key, ttl) -> {

ByteBuffer wrappedKey = ByteBuffer.wrap(key);

Flux<?> cacheLockCheckFlux = Flux.interval(Duration.ZERO, this.sleepTime).takeUntil(count ->
executeLockFree(connection -> !doCheckLock(cacheName, connection)));

Mono<ByteBuffer> getMono = shouldExpireWithin(ttl)
? executeReactively(connection -> connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl)))
: executeReactively(connection -> connection.stringCommands().get(wrappedKey));

Mono<ByteBuffer> result = cacheLockCheckFlux.then(getMono);

@SuppressWarnings("all")
Mono<byte[]> byteArrayResult = result.map(DefaultRedisCacheWriter::nullSafeGetBytes);

return byteArrayResult.toFuture();
};
}

@Override
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {

Expand Down Expand Up @@ -282,32 +366,42 @@ private Long doUnlock(String name, RedisConnection connection) {
return connection.keyCommands().del(createCacheLockKey(name));
}

boolean doCheckLock(String name, RedisConnection connection) {
return isTrue(connection.keyCommands().exists(createCacheLockKey(name)));
}
private <T> T execute(String name, Function<RedisConnection, T> callback) {

/**
* @return {@literal true} if {@link RedisCacheWriter} uses locks.
*/
private boolean isLockingCacheWriter() {
return !sleepTime.isZero() && !sleepTime.isNegative();
try (RedisConnection connection = this.connectionFactory.getConnection()) {
checkAndPotentiallyWaitUntilUnlocked(name, connection);
return callback.apply(connection);
}
}

private <T> T execute(String name, Function<RedisConnection, T> callback) {
private <T> T executeLockFree(Function<RedisConnection, T> callback) {

try (RedisConnection connection = connectionFactory.getConnection()) {
checkAndPotentiallyWaitUntilUnlocked(name, connection);
try (RedisConnection connection = this.connectionFactory.getConnection()) {
return callback.apply(connection);
}
}

private void executeLockFree(Consumer<RedisConnection> callback) {
private <T> T executeReactively(Function<ReactiveRedisConnection, T> callback) {

try (RedisConnection connection = connectionFactory.getConnection()) {
callback.accept(connection);
ReactiveRedisConnection connection = getReactiveRedisConnectionFactory().getReactiveConnection();

try {
return callback.apply(connection);
}
finally {
connection.closeLater();
}
}

/**
* Determines whether this {@link RedisCacheWriter} uses locks during caching operations.
*
* @return {@literal true} if {@link RedisCacheWriter} uses locks.
*/
private boolean isLockingCacheWriter() {
return !this.sleepTime.isZero() && !this.sleepTime.isNegative();
}

private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) {

if (!isLockingCacheWriter()) {
Expand All @@ -318,29 +412,46 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c

try {
while (doCheckLock(name, connection)) {
Thread.sleep(sleepTime.toMillis());
Thread.sleep(this.sleepTime.toMillis());
}
} catch (InterruptedException cause) {

// Re-interrupt current thread, to allow other participants to react.
// Re-interrupt current Thread to allow other participants to react.
Thread.currentThread().interrupt();

String message = String.format("Interrupted while waiting to unlock cache %s", name);

throw new PessimisticLockingFailureException(message, cause);
} finally {
statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
this.statistics.incLockTime(name, System.nanoTime() - lockWaitTimeNs);
}
}

boolean doCheckLock(String name, RedisConnection connection) {
return isTrue(connection.keyCommands().exists(createCacheLockKey(name)));
}

private boolean isReactive() {
return this.connectionFactory instanceof ReactiveRedisConnectionFactory;
}

private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory() {
return (ReactiveRedisConnectionFactory) this.connectionFactory;
}

private static byte[] createCacheLockKey(String name) {
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
}

private boolean isTrue(@Nullable Boolean value) {
private static boolean isTrue(@Nullable Boolean value) {
return Boolean.TRUE.equals(value);
}

@Nullable
private static byte[] nullSafeGetBytes(@Nullable ByteBuffer value) {
return value != null ? ByteUtils.getBytes(value) : null;
}

private static boolean shouldExpireWithin(@Nullable Duration ttl) {
return ttl != null && !ttl.isZero() && !ttl.isNegative();
}
Expand Down
28 changes: 26 additions & 2 deletions src/main/java/org/springframework/data/redis/cache/RedisCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.springframework.util.ReflectionUtils;

/**
* {@link org.springframework.cache.Cache} implementation using for Redis as the underlying store for cache data.
* {@link AbstractValueAdaptingCache Cache} implementation using Redis as the underlying store for cache data.
* <p>
* Use {@link RedisCacheManager} to create {@link RedisCache} instances.
*
Expand All @@ -61,7 +61,7 @@
@SuppressWarnings("unused")
public class RedisCache extends AbstractValueAdaptingCache {

private static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);
static final byte[] BINARY_NULL_VALUE = RedisSerializer.java().serialize(NullValue.INSTANCE);

private final Lock lock = new ReentrantLock();

Expand Down Expand Up @@ -293,14 +293,38 @@ protected Object preProcessCacheValue(@Nullable Object value) {

@Override
public CompletableFuture<?> retrieve(Object key) {

if (getCacheWriter().isRetrieveSupported()) {
return retrieveValue(key).thenApply(this::nullSafeDeserializedStoreValue);
}

return super.retrieve(key);
}

@Override
@SuppressWarnings("unchecked")
public <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) {

if (getCacheWriter().isRetrieveSupported()) {
return retrieveValue(key)
.thenApply(this::nullSafeDeserializedStoreValue)
.thenCompose(cachedValue -> cachedValue != null
? CompletableFuture.completedFuture((T) cachedValue)
: valueLoader.get());
}

return super.retrieve(key, valueLoader);
}

CompletableFuture<byte[]> retrieveValue(Object key) {
return getCacheWriter().retrieve(getName(), createAndConvertCacheKey(key));
}

@Nullable
Object nullSafeDeserializedStoreValue(@Nullable byte[] value) {
return value != null ? fromStoreValue(deserializeCacheValue(value)) : null;
}

/**
* Serialize the given {@link String cache key}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package org.springframework.data.redis.cache;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -135,6 +137,51 @@ default byte[] get(String name, byte[] key, @Nullable Duration ttl) {
return get(name, key);
}

/**
* Determines whether the asynchronous {@link #retrieve(String, byte[])}
* and {@link #retrieve(String, byte[], Duration)} cache operations are supported by the implementation.
* <p>
* The main factor for whether the {@literal retrieve} operation can be supported will primarily be determined by
* the Redis driver in use at runtime.
* <p>
* Returns {@literal false} by default. This will have an effect of {@link RedisCache#retrieve(Object)}
* and {@link RedisCache#retrieve(Object, Supplier)} throwing an {@link UnsupportedOperationException}.
*
* @return {@literal true} if asynchronous {@literal retrieve} operations are supported by the implementation.
*/
default boolean isRetrieveSupported() {
return false;
}

/**
* Returns the {@link CompletableFuture value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* <p>
* This operation is non-blocking.
*
* @param name {@link String} with the name of the {@link RedisCache}.
* @param key {@link byte[] key} mapped to the {@link CompletableFuture value} in the {@link RedisCache}.
* @return the {@link CompletableFuture value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* @see #retrieve(String, byte[], Duration)
* @since 3.2.0
*/
default CompletableFuture<byte[]> retrieve(String name, byte[] key) {
return retrieve(name, key, null);
}

/**
* Returns the {@link CompletableFuture value} to which the {@link RedisCache} maps the given {@link byte[] key}
* setting the {@link Duration TTL expiration} for the cache entry.
* <p>
* This operation is non-blocking.
*
* @param name {@link String} with the name of the {@link RedisCache}.
* @param key {@link byte[] key} mapped to the {@link CompletableFuture value} in the {@link RedisCache}.
* @param ttl {@link Duration} specifying the {@literal expiration timeout} for the cache entry.
* @return the {@link CompletableFuture value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* @since 3.2.0
*/
CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl);

/**
* Write the given key/value pair to Redis and set the expiration time if defined.
*
Expand Down
Loading

0 comments on commit a72c426

Please sign in to comment.