Skip to content

Commit

Permalink
Made TSC init work
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed Dec 5, 2024
1 parent a89a38c commit f4d3504
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.service.CacheService;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
Expand Down Expand Up @@ -167,16 +168,8 @@ public TieredQueryCache(CacheService cacheService, Settings settings, ClusterSer
.setKeyType(CompositeKey.class)
.setRemovalListener(removalListener)
.setDimensionNames(List.of(SHARD_ID_DIMENSION_NAME))
.setSegmentCount(1) // TODO: REMOVE!! For testing only
/*.setCachedResultParser((bytesReference) -> {
try {
return CachedQueryResult.getPolicyValues(bytesReference);
} catch (IOException e) {
// Set took time to -1, which will always be rejected by the policy.
return new CachedQueryResult.PolicyValues(-1);
}
})*/ // TODO - i forgor that this is unfortunately hardcoded to always return CachedQueryResult.
// But for the proof of concept we can hack around that by just adding what we need? Or do we actually need anything?
// TODO: I dont know what to do with this. This shouldn't be hardcoded like that...
.setCachedResultParser((cacheAndCount) -> new CachedQueryResult.PolicyValues(1))
.setKeySerializer(keySerializer)
.setValueSerializer(new CacheAndCountSerializer())
.setClusterSettings(clusterService.getClusterSettings())
Expand All @@ -200,7 +193,6 @@ protected CacheAndCount get(Query query, IndexReader.CacheHelper cacheHelper) {
if (leafCache == null) {
String shardIdName = getShardIdName(readerKey);
outerCacheMissCounts.computeIfAbsent(shardIdName, (k) -> new LongAdder()).add(1);
// onMiss(readerKey, query);
return null;
}
// Singleton stuff would go here if I decide it's needed
Expand Down Expand Up @@ -327,6 +319,11 @@ private String getShardIdName(Object readerCoreKey) {
return shardKeyMap.getShardId(readerCoreKey).toString();
}

// pkg-private for testing
ICache<CompositeKey, CacheAndCount> getInnerCache() {
return innerCache;
}

private class TieredCachingWrapperWeight extends ConstantScoreWeight {
// TODO - based on LRUQC CachingWrapperWeight, but it uses *this* class's get() to get the actual value from cache when needed
private final Weight in;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;

Expand Down Expand Up @@ -249,7 +250,8 @@ public V get(ICacheKey<K> key) {
@Override
public void put(ICacheKey<K> key, V value) {
// First check in case the key is already present in either of tiers.
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(true).apply(key);
// TODO: I don't think this should capture stats - changed to false.
Tuple<V, String> cacheValueTuple = getValueFromTieredCache(false).apply(key);
if (cacheValueTuple == null) {
// In case it is not present in any tier, put it inside onHeap cache by default.
try (ReleasableLock ignore = writeLock.acquire()) {
Expand Down Expand Up @@ -828,16 +830,21 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
long diskCacheSize = TIERED_SPILLOVER_DISK_STORE_SIZE.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
.get(settings);

return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
Builder<K, V> builder = new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
.setOnHeapCacheFactory(onHeapCacheFactory)
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.setNumberOfSegments(numberOfSegments)
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType))
.setOnHeapCacheSizeInBytes(onHeapCacheSize)
.setDiskCacheSize(diskCacheSize)
.build();
.setDiskCacheSize(diskCacheSize);

try {
builder.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType));
} catch (SettingsException ignored) {
// TODO: if not registered (for query cache), skip the took time threshold policy.
}
return builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,41 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.store.Directory;
import org.opensearch.cache.common.tier.MockDiskCache;
import org.opensearch.cache.common.tier.TieredSpilloverCache;
import org.opensearch.cache.common.tier.TieredSpilloverCachePlugin;
import org.opensearch.cache.common.tier.TieredSpilloverCacheSettings;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.ICacheKey;
import org.opensearch.common.cache.module.CacheModule;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.node.Node;
import org.opensearch.plugins.CachePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS;

public class TieredQueryCacheTests extends OpenSearchSingleNodeTestCase {

Expand Down Expand Up @@ -75,6 +91,64 @@ public void testBasics_WithOpenSearchOnHeapCache() throws IOException {
TieredQueryCache cache = getQueryCache(settings);
s.setQueryCache(cache);

ICache<TieredQueryCache.CompositeKey, TieredQueryCache.CacheAndCount> innerCache = cache.getInnerCache();
assertTrue(innerCache instanceof OpenSearchOnHeapCache);

testBasicsDummyQuery(cache, s, shard);

// TODO: not implementing shard closing logic for the PoC
/*IOUtils.close(r, dir);
// got emptied, but no changes to other metrics
stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(20L, stats.getCacheCount());
assertEquals(1L, stats.getHitCount());
assertEquals(40L, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);
cache.onClose(shard);
// forgot everything
stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(0L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
assertEquals(0L, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() >= 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);*/

cache.close(); // this triggers some assertions
IOUtils.close(r, dir);
}

public void testBasics_WithTSC_WithSmallHeapSize() throws Exception {
// TODO: Check all the logic works when TSC is innerCache and can only fit a few keys into its heap tier (aka test the serializers
// work.)

threadPool = getThreadPool();
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
DirectoryReader r = DirectoryReader.open(w);
w.close();
ShardId shard = new ShardId("index", "_na_", 0);
r = OpenSearchDirectoryReader.wrap(r, shard);
IndexSearcher s = new IndexSearcher(r);
s.setQueryCachingPolicy(alwaysCachePolicy());

TieredQueryCache cache = getQueryCache(getTSCSettings(1000));
s.setQueryCache(cache);

ICache<TieredQueryCache.CompositeKey, TieredQueryCache.CacheAndCount> innerCache = cache.getInnerCache();
assertTrue(innerCache instanceof TieredSpilloverCache);

testBasicsDummyQuery(cache, s, shard);

cache.close();
IOUtils.close(r, dir);
}

private void testBasicsDummyQuery(TieredQueryCache cache, IndexSearcher s, ShardId shard) throws IOException {
QueryCacheStats stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(0L, stats.getCacheCount());
Expand All @@ -89,6 +163,8 @@ public void testBasics_WithOpenSearchOnHeapCache() throws IOException {
assertEquals(1L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
assertEquals(2L, stats.getMissCount()); // TODO: Why is there 2x misses per s.count()?
// TODO: returning 3 not 2. 1 from TSC and 2 from outer misses. Doesnt seem to be a TSC stats holder problem.
// In original, there were 2 from outer misses (this is as we expect - confirmed it all looks right at end) and 0 from inner misses.
assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);

int numEntries = 3;
Expand All @@ -113,47 +189,58 @@ public void testBasics_WithOpenSearchOnHeapCache() throws IOException {
assertEquals(1L, stats.getHitCount());
assertEquals(2 * numEntries, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);

// TODO: not implementing shard closing logic for the PoC
/*IOUtils.close(r, dir);
// got emptied, but no changes to other metrics
stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(20L, stats.getCacheCount());
assertEquals(1L, stats.getHitCount());
assertEquals(40L, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);
cache.onClose(shard);
// forgot everything
stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(0L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
assertEquals(0L, stats.getMissCount());
assertTrue(stats.getMemorySizeInBytes() >= 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);*/

cache.close(); // this triggers some assertions
IOUtils.close(r, dir);
}

public void testBasics_WithTSC_WithSmallHeapSize() throws Exception {
// TODO: Check all the logic works when TSC is innerCache and can only fit a few keys into its heap tier (aka test the serializers
// work.)
}

public void testBasicsWithLongPointRangeQuery() throws Exception {
// TODO
}

private Settings getTSCSettings(int heapBytes) {
return Settings.builder()
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_QUERY_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_QUERY_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_QUERY_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_QUERY_CACHE.getSettingPrefix()
).getKey(),
heapBytes + "b"
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(
CacheType.INDICES_QUERY_CACHE.getSettingPrefix()
).getKey(),
1
)
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.build();

}

private TieredQueryCache getQueryCache(Settings settings) throws IOException {
try (NodeEnvironment env = newNodeEnvironment(settings)) {
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
clusterService.getClusterSettings().registerSetting(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_QUERY_CACHE));
/*clusterService.getClusterSettings().put(
Settings.builder().put(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_QUERY_CACHE).getKey(), true).build()
);*/ // TODO: This isn't actually applying anything to the clusterService...
return new TieredQueryCache(
new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(),
new CacheModule(List.of(new TieredSpilloverCachePlugin(settings), new MockDiskCachePlugin()), settings).getCacheService(),
settings,
ClusterServiceUtils.createClusterService(threadPool),
clusterService,
env
);
}
Expand All @@ -174,4 +261,20 @@ public boolean shouldCache(Query query) {
}
};
}

// Duplicated from TieredSpilloverCacheIT.java
public static class MockDiskCachePlugin extends Plugin implements CachePlugin {

public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false, 1));
}

@Override
public String getName() {
return "mock_disk_plugin";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down

0 comments on commit f4d3504

Please sign in to comment.