Skip to content

Commit

Permalink
Implement retrieve(key) and retrieve(key, :Supplier<CompletableFuture…
Browse files Browse the repository at this point in the history
…<T>>) operations in RedisCache.

Closes spring-projects#2650
  • Loading branch information
jxblum committed Oct 3, 2023
1 parent 1c21d22 commit 4f2a1ee
Show file tree
Hide file tree
Showing 6 changed files with 544 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,29 @@
*/
package org.springframework.data.redis.cache;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import reactor.core.publisher.Mono;

/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
* and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
Expand Down Expand Up @@ -114,8 +123,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
Assert.notNull(key, "Key must not be null");

byte[] result = shouldExpireWithin(ttl)
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
: execute(name, connection -> connection.stringCommands().get(key));
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
: execute(name, connection -> connection.stringCommands().get(key));

statistics.incGets(name);

Expand All @@ -128,6 +137,81 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
return result;
}

@Override
public CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl) {

Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");

CompletableFuture<byte[]> result = nonBlockingExecutionStrategy(name).apply(key, ttl);

result = result.thenApply(cachedValue -> {

statistics.incGets(name);

if (cachedValue != null) {
statistics.incHits(name);
} else {
statistics.incMisses(name);
}

return cachedValue;
});

return result;
}

private BiFunction<byte[], Duration, CompletableFuture<byte[]>> nonBlockingExecutionStrategy(String cacheName) {
return isReactiveAvailable() ? reactiveExecutionStrategy(cacheName) : asyncExecutionStrategy(cacheName);
}

// Execution Strategy applied when a non-reactive Redis driver is used, such as Jedis.
// Treats API consistently (that is, using Reactive types, such as Mono) at the RedisCacheWriter level
// whether "technically Reactive" or not; clearly Jedis is not "Reactive".
private BiFunction<byte[], Duration, CompletableFuture<byte[]>> asyncExecutionStrategy(String cacheName) {

return (key, ttl) -> {

Supplier<byte[]> getKey = () -> execute(cacheName, connection -> connection.stringCommands().get(key));

Supplier<byte[]> getKeyWithExpiration = () -> execute(cacheName, connection ->
connection.stringCommands().getEx(key, Expiration.from(ttl)));

// NOTE: CompletableFuture.supplyAsync(:Supplier) is necessary in this case to prevent blocking
// on Mono.subscribe(:Consumer).
return shouldExpireWithin(ttl)
? CompletableFuture.supplyAsync(getKeyWithExpiration)
: CompletableFuture.supplyAsync(getKey);

};
}

// Execution Strategy applied when Lettuce (Reactive driver) is used.
// Be careful to do this in a "non-blocking way", but still taking the "named" cache lock into consideration.
private BiFunction<byte[], Duration, CompletableFuture<byte[]>> reactiveExecutionStrategy(String cacheName) {

return (key, ttl) -> {

ByteBuffer wrappedKey = ByteBuffer.wrap(key);

// Do the same lock check as the regular Cache.get(key); be careful of blocking!
Mono<ByteBuffer> cacheLockCheckMono = Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
executeLockFree(connection -> checkAndPotentiallyWaitUntilUnlocked(cacheName, connection));
return ByteBuffer.wrap(new byte[0]);
}));

Mono<ByteBuffer> getMono = shouldExpireWithin(ttl)
? executeReactively(connection -> connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl)))
: executeReactively(connection -> connection.stringCommands().get(wrappedKey));

Mono<ByteBuffer> result = cacheLockCheckMono.then(getMono);

@SuppressWarnings("all")
Mono<byte[]> byteArrayResult = result.map(DefaultRedisCacheWriter::nullSafeGetBytes);

return byteArrayResult.toFuture();
};
}
@Override
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {

Expand Down Expand Up @@ -308,6 +392,18 @@ private void executeLockFree(Consumer<RedisConnection> callback) {
}
}

private <T> T executeReactively(Function<ReactiveRedisConnection, T> callback) {

ReactiveRedisConnection connection = getReactiveRedisConnectionFactory().getReactiveConnection();

try {
return callback.apply(connection);
}
finally {
connection.closeLater();
}
}

private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) {

if (!isLockingCacheWriter()) {
Expand All @@ -333,14 +429,27 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
}
}

private boolean isReactiveAvailable() {
return this.connectionFactory instanceof ReactiveRedisConnectionFactory;
}

private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory() {
return (ReactiveRedisConnectionFactory) this.connectionFactory;
}

private static byte[] createCacheLockKey(String name) {
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
}

private boolean isTrue(@Nullable Boolean value) {
private static boolean isTrue(@Nullable Boolean value) {
return Boolean.TRUE.equals(value);
}

@Nullable
private static byte[] nullSafeGetBytes(@Nullable ByteBuffer value) {
return value != null ? ByteUtils.getBytes(value) : null;
}

private static boolean shouldExpireWithin(@Nullable Duration ttl) {
return ttl != null && !ttl.isZero() && !ttl.isNegative();
}
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/org/springframework/data/redis/cache/RedisCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;

import reactor.core.publisher.Mono;

/**
* {@link org.springframework.cache.Cache} implementation using for Redis as the underlying store for cache data.
* {@link AbstractValueAdaptingCache Cache} implementation using Redis as the underlying store for cache data.
* <p>
* Use {@link RedisCacheManager} to create {@link RedisCache} instances.
*
Expand Down Expand Up @@ -293,12 +295,22 @@ protected Object preProcessCacheValue(@Nullable Object value) {

@Override
public CompletableFuture<?> retrieve(Object key) {
return super.retrieve(key);
return retrieveValue(key).thenApply(this::deserializeCacheValue);
}

@Override
@SuppressWarnings("unchecked")
public <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) {
return super.retrieve(key, valueLoader);

return retrieveValue(key)
.thenApply(this::deserializeCacheValue)
.thenCompose(cachedValue -> cachedValue != null
? CompletableFuture.completedFuture((T) cachedValue)
: valueLoader.get());
}

private CompletableFuture<byte[]> retrieveValue(Object key) {
return getCacheWriter().retrieve(getName(), createAndConvertCacheKey(key));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.data.redis.cache;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -135,6 +136,35 @@ default byte[] get(String name, byte[] key, @Nullable Duration ttl) {
return get(name, key);
}

/**
* Returns the {@link CompletableFuture value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* <p>
* This operation is non-blocking.
*
* @param name {@link String} with the name of the {@link RedisCache}.
* @param key {@link byte[] key} mapped to the {@link CompletableFuture value} in the {@link RedisCache}.
* @return the {@link CompletableFuture value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* @see #retrieve(String, byte[], Duration)
* @since 3.2.0
*/
default CompletableFuture<byte[]> retrieve(String name, byte[] key) {
return retrieve(name, key, null);
}

/**
* Returns the {@link CompletableFuture value} to which the {@link RedisCache} maps the given {@link byte[] key}
* setting the {@link Duration TTL expiration} for the cache entry.
* <p>
* This operation is non-blocking.
*
* @param name {@link String} with the name of the {@link RedisCache}.
* @param key {@link byte[] key} mapped to the {@link CompletableFuture value} in the {@link RedisCache}.
* @param ttl {@link Duration} specifying the {@literal expiration timeout} for the cache entry.
* @return the {@link CompletableFuture value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* @since 3.2.0
*/
CompletableFuture<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl);

/**
* Write the given key/value pair to Redis and set the expiration time if defined.
*
Expand Down
Loading

0 comments on commit 4f2a1ee

Please sign in to comment.