Skip to content

Commit

Permalink
Implement retrieve(key) and retrieve(key, :Supplier<CompletableFuture…
Browse files Browse the repository at this point in the history
…<T>>) operations in RedisCache.

Closes spring-projects#2650
  • Loading branch information
jxblum committed Sep 23, 2023
1 parent 624d7c6 commit d87815f
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,29 @@
*/
package org.springframework.data.redis.cache;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import reactor.core.publisher.Mono;

/**
* {@link RedisCacheWriter} implementation capable of reading/writing binary data from/to Redis in {@literal standalone}
* and {@literal cluster} environments, and uses a given {@link RedisConnectionFactory} to obtain the actual
Expand Down Expand Up @@ -114,8 +123,8 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
Assert.notNull(key, "Key must not be null");

byte[] result = shouldExpireWithin(ttl)
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
: execute(name, connection -> connection.stringCommands().get(key));
? execute(name, connection -> connection.stringCommands().getEx(key, Expiration.from(ttl)))
: execute(name, connection -> connection.stringCommands().get(key));

statistics.incGets(name);

Expand All @@ -128,6 +137,77 @@ public byte[] get(String name, byte[] key, @Nullable Duration ttl) {
return result;
}

@Override
public Mono<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl) {

Assert.notNull(name, "Name must not be null");
Assert.notNull(key, "Key must not be null");

Mono<byte[]> result = nonBlockingExecutionStrategy(name).apply(key, ttl);

result = result.doOnSuccess(byteBuffer -> {
if (byteBuffer != null) {
statistics.incHits(name);
}
else {
statistics.incMisses(name);
}
}).doFirst(() -> statistics.incGets(name));

return result;
}

private BiFunction<byte[], Duration, Mono<byte[]>> nonBlockingExecutionStrategy(String cacheName) {
return isReactiveAvailable() ? reactiveExecutionStrategy(cacheName) : asyncExecutionStrategy(cacheName);
}

// Execution Strategy applied when Jedis (non-Reactive driver) is used.
// Treats API consistently (that is, using Reactive types, such as Mono) at the RedisCacheWriter level
// whether "technically Reactive" or not; clearly Jedis is not "Reactive".
private BiFunction<byte[], Duration, Mono<byte[]>> asyncExecutionStrategy(String cacheName) {

return (key, ttl) -> {

Supplier<byte[]> getKey = () -> execute(cacheName, connection -> connection.stringCommands().get(key));

Supplier<byte[]> getKeyWithExpiration = () -> execute(cacheName, connection ->
connection.stringCommands().getEx(key, Expiration.from(ttl)));

// NOTE: CompletableFuture.supplyAsync(:Supplier) is necessary in this case to prevent blocking
// on Mono.subscribe(:Consumer).
return shouldExpireWithin(ttl)
? Mono.fromFuture(CompletableFuture.supplyAsync(getKeyWithExpiration))
: Mono.fromFuture(CompletableFuture.supplyAsync(getKey));
};
}

// Execution Strategy applied when Lettuce (Reactive driver) is used.
// Be careful to do this in a "non-blocking way", but still taking the "named" cache lock into consideration.
private BiFunction<byte[], Duration, Mono<byte[]>> reactiveExecutionStrategy(String cacheName) {

return (key, ttl) -> {

ByteBuffer wrappedKey = ByteBuffer.wrap(key);

Mono<ByteBuffer> result = shouldExpireWithin(ttl)
? executeReactively(connection -> connection.stringCommands().getEx(wrappedKey, Expiration.from(ttl)))
: executeReactively(connection -> connection.stringCommands().get(wrappedKey));

// Do the same lock check as the regular Cache.get(key); be careful of blocking!
result = result.doFirst(() -> executeLockFree(connection ->
checkAndPotentiallyWaitUntilUnlocked(cacheName, connection)));

@SuppressWarnings("all")
Mono<byte[]> byteArrayResult = result.map(DefaultRedisCacheWriter::nullSafeGetBytes);

return byteArrayResult;
};
}

@Nullable
private static byte[] nullSafeGetBytes(@Nullable ByteBuffer value) {
return value != null ? ByteUtils.getBytes(value) : null;
}
@Override
public void put(String name, byte[] key, byte[] value, @Nullable Duration ttl) {

Expand Down Expand Up @@ -308,6 +388,18 @@ private void executeLockFree(Consumer<RedisConnection> callback) {
}
}

private <T> T executeReactively(Function<ReactiveRedisConnection, T> callback) {

ReactiveRedisConnection connection = getReactiveRedisConnectionFactory().getReactiveConnection();

try {
return callback.apply(connection);
}
finally {
connection.closeLater();
}
}

private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection connection) {

if (!isLockingCacheWriter()) {
Expand All @@ -333,11 +425,19 @@ private void checkAndPotentiallyWaitUntilUnlocked(String name, RedisConnection c
}
}

private boolean isReactiveAvailable() {
return this.connectionFactory instanceof ReactiveRedisConnectionFactory;
}

private ReactiveRedisConnectionFactory getReactiveRedisConnectionFactory() {
return (ReactiveRedisConnectionFactory) this.connectionFactory;
}

private static byte[] createCacheLockKey(String name) {
return (name + "~lock").getBytes(StandardCharsets.UTF_8);
}

private boolean isTrue(@Nullable Boolean value) {
private static boolean isTrue(@Nullable Boolean value) {
return Boolean.TRUE.equals(value);
}

Expand Down
20 changes: 17 additions & 3 deletions src/main/java/org/springframework/data/redis/cache/RedisCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package org.springframework.data.redis.cache;

import reactor.core.publisher.Mono;

import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.time.Duration;
Expand Down Expand Up @@ -46,7 +48,7 @@
import org.springframework.util.ReflectionUtils;

/**
* {@link org.springframework.cache.Cache} implementation using for Redis as the underlying store for cache data.
* {@link AbstractValueAdaptingCache Cache} implementation using Redis as the underlying store for cache data.
* <p>
* Use {@link RedisCacheManager} to create {@link RedisCache} instances.
*
Expand Down Expand Up @@ -293,12 +295,24 @@ protected Object preProcessCacheValue(@Nullable Object value) {

@Override
public CompletableFuture<?> retrieve(Object key) {
return super.retrieve(key);

return retrieveMono(key)
.map(this::deserializeCacheValue)
.toFuture();
}

@Override
@SuppressWarnings("unchecked")
public <T> CompletableFuture<T> retrieve(Object key, Supplier<CompletableFuture<T>> valueLoader) {
return super.retrieve(key, valueLoader);

return retrieveMono(key)
.map(value -> (T) deserializeCacheValue(value))
.switchIfEmpty(Mono.fromCompletionStage(valueLoader))
.toFuture();
}

private Mono<byte[]> retrieveMono(Object key) {
return getCacheWriter().retrieve(getName(), createAndConvertCacheKey(key));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import reactor.core.publisher.Mono;

/**
* {@link RedisCacheWriter} provides low-level access to Redis commands ({@code SET, SETNX, GET, EXPIRE,...}) used for
* caching.
Expand Down Expand Up @@ -135,6 +137,40 @@ default byte[] get(String name, byte[] key, @Nullable Duration ttl) {
return get(name, key);
}

/**
* Returns the {@link Mono value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* <p>
* This operation does not block.
*
* @param name {@link String} containing the name of the {@link RedisCache}.
* @param key {@link byte[] key} mapped to the {@link Mono value} in the {@link RedisCache}.
* @return the {@link Mono value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* @throws IllegalStateException if the Redis connection factory is not reactive.
* @see reactor.core.publisher.Mono
* @see java.nio.ByteBuffer
* @since 3.2.0
*/
default Mono<byte[]> retrieve(String name, byte[] key) {
return retrieve(name, key, null);
}

/**
* Returns the {@link Mono value} to which the {@link RedisCache} maps the given {@link byte[] key}
* setting the {@link Duration TTL expiration} for the cache entry.
* <p>
* This operation does not block.
*
* @param name {@link String} containing the name of the {@link RedisCache}.
* @param key {@link byte[] key} mapped to the {@link Mono value} in the {@link RedisCache}.
* @param ttl {@link Duration} specifying the {@literal expiration timeout} for the cache entry.
* @return the {@link Mono value} to which the {@link RedisCache} maps the given {@link byte[] key}.
* @throws IllegalStateException if the Redis connection factory is not reactive.
* @see reactor.core.publisher.Mono
* @see java.nio.ByteBuffer
* @since 3.2.0
*/
Mono<byte[]> retrieve(String name, byte[] key, @Nullable Duration ttl);

/**
* Write the given key/value pair to Redis and set the expiration time if defined.
*
Expand Down
Loading

0 comments on commit d87815f

Please sign in to comment.