From f4d3504cc703fc2ad81dad05185833785ca742bd Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Thu, 5 Dec 2024 15:51:51 -0800 Subject: [PATCH] Made TSC init work Signed-off-by: Peter Alfonsi --- .../cache/common/query/TieredQueryCache.java | 19 +- .../common/tier/TieredSpilloverCache.java | 17 +- .../common/query/TieredQueryCacheTests.java | 165 ++++++++++++++---- .../cache/common/tier/MockDiskCache.java | 1 + 4 files changed, 155 insertions(+), 47 deletions(-) diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/query/TieredQueryCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/query/TieredQueryCache.java index e63f89b18b60b..b393b64eb57ba 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/query/TieredQueryCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/query/TieredQueryCache.java @@ -43,6 +43,7 @@ import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; +import org.opensearch.common.cache.policy.CachedQueryResult; import org.opensearch.common.cache.serializer.Serializer; import org.opensearch.common.cache.service.CacheService; import org.opensearch.common.cache.stats.ImmutableCacheStats; @@ -167,16 +168,8 @@ public TieredQueryCache(CacheService cacheService, Settings settings, ClusterSer .setKeyType(CompositeKey.class) .setRemovalListener(removalListener) .setDimensionNames(List.of(SHARD_ID_DIMENSION_NAME)) - .setSegmentCount(1) // TODO: REMOVE!! For testing only - /*.setCachedResultParser((bytesReference) -> { - try { - return CachedQueryResult.getPolicyValues(bytesReference); - } catch (IOException e) { - // Set took time to -1, which will always be rejected by the policy. - return new CachedQueryResult.PolicyValues(-1); - } - })*/ // TODO - i forgor that this is unfortunately hardcoded to always return CachedQueryResult. - // But for the proof of concept we can hack around that by just adding what we need? Or do we actually need anything? + // TODO: I dont know what to do with this. This shouldn't be hardcoded like that... + .setCachedResultParser((cacheAndCount) -> new CachedQueryResult.PolicyValues(1)) .setKeySerializer(keySerializer) .setValueSerializer(new CacheAndCountSerializer()) .setClusterSettings(clusterService.getClusterSettings()) @@ -200,7 +193,6 @@ protected CacheAndCount get(Query query, IndexReader.CacheHelper cacheHelper) { if (leafCache == null) { String shardIdName = getShardIdName(readerKey); outerCacheMissCounts.computeIfAbsent(shardIdName, (k) -> new LongAdder()).add(1); - // onMiss(readerKey, query); return null; } // Singleton stuff would go here if I decide it's needed @@ -327,6 +319,11 @@ private String getShardIdName(Object readerCoreKey) { return shardKeyMap.getShardId(readerCoreKey).toString(); } + // pkg-private for testing + ICache getInnerCache() { + return innerCache; + } + private class TieredCachingWrapperWeight extends ConstantScoreWeight { // TODO - based on LRUQC CachingWrapperWeight, but it uses *this* class's get() to get the actual value from cache when needed private final Weight in; diff --git a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java index 38a6915ffd10e..fb615bc4a8ab5 100644 --- a/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java +++ b/modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java @@ -25,6 +25,7 @@ import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ReleasableLock; @@ -249,7 +250,8 @@ public V get(ICacheKey key) { @Override public void put(ICacheKey key, V value) { // First check in case the key is already present in either of tiers. - Tuple cacheValueTuple = getValueFromTieredCache(true).apply(key); + // TODO: I don't think this should capture stats - changed to false. + Tuple cacheValueTuple = getValueFromTieredCache(false).apply(key); if (cacheValueTuple == null) { // In case it is not present in any tier, put it inside onHeap cache by default. try (ReleasableLock ignore = writeLock.acquire()) { @@ -828,16 +830,21 @@ public ICache create(CacheConfig config, CacheType cacheType, long diskCacheSize = TIERED_SPILLOVER_DISK_STORE_SIZE.getConcreteSettingForNamespace(cacheType.getSettingPrefix()) .get(settings); - return new Builder().setDiskCacheFactory(diskCacheFactory) + Builder builder = new Builder().setDiskCacheFactory(diskCacheFactory) .setOnHeapCacheFactory(onHeapCacheFactory) .setRemovalListener(config.getRemovalListener()) .setCacheConfig(config) .setCacheType(cacheType) .setNumberOfSegments(numberOfSegments) - .addPolicy(new TookTimePolicy(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType)) .setOnHeapCacheSizeInBytes(onHeapCacheSize) - .setDiskCacheSize(diskCacheSize) - .build(); + .setDiskCacheSize(diskCacheSize); + + try { + builder.addPolicy(new TookTimePolicy(diskPolicyThreshold, cachedResultParser, config.getClusterSettings(), cacheType)); + } catch (SettingsException ignored) { + // TODO: if not registered (for query cache), skip the took time threshold policy. + } + return builder.build(); } @Override diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/query/TieredQueryCacheTests.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/query/TieredQueryCacheTests.java index e68bf99492aea..341a3239335d3 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/query/TieredQueryCacheTests.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/query/TieredQueryCacheTests.java @@ -16,18 +16,28 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.store.Directory; +import org.opensearch.cache.common.tier.MockDiskCache; +import org.opensearch.cache.common.tier.TieredSpilloverCache; +import org.opensearch.cache.common.tier.TieredSpilloverCachePlugin; +import org.opensearch.cache.common.tier.TieredSpilloverCacheSettings; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.cache.CacheType; +import org.opensearch.common.cache.ICache; +import org.opensearch.common.cache.ICacheKey; import org.opensearch.common.cache.module.CacheModule; import org.opensearch.common.cache.settings.CacheSettings; import org.opensearch.common.cache.store.OpenSearchOnHeapCache; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.node.Node; +import org.opensearch.plugins.CachePlugin; +import org.opensearch.plugins.Plugin; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.ThreadPool; @@ -35,6 +45,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP; +import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS; public class TieredQueryCacheTests extends OpenSearchSingleNodeTestCase { @@ -75,6 +91,64 @@ public void testBasics_WithOpenSearchOnHeapCache() throws IOException { TieredQueryCache cache = getQueryCache(settings); s.setQueryCache(cache); + ICache innerCache = cache.getInnerCache(); + assertTrue(innerCache instanceof OpenSearchOnHeapCache); + + testBasicsDummyQuery(cache, s, shard); + + // TODO: not implementing shard closing logic for the PoC + /*IOUtils.close(r, dir); + + // got emptied, but no changes to other metrics + stats = cache.getStats(shard); + assertEquals(0L, stats.getCacheSize()); + assertEquals(20L, stats.getCacheCount()); + assertEquals(1L, stats.getHitCount()); + assertEquals(40L, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); + + cache.onClose(shard); + + // forgot everything + stats = cache.getStats(shard); + assertEquals(0L, stats.getCacheSize()); + assertEquals(0L, stats.getCacheCount()); + assertEquals(0L, stats.getHitCount()); + assertEquals(0L, stats.getMissCount()); + assertTrue(stats.getMemorySizeInBytes() >= 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);*/ + + cache.close(); // this triggers some assertions + IOUtils.close(r, dir); + } + + public void testBasics_WithTSC_WithSmallHeapSize() throws Exception { + // TODO: Check all the logic works when TSC is innerCache and can only fit a few keys into its heap tier (aka test the serializers + // work.) + + threadPool = getThreadPool(); + Directory dir = newDirectory(); + IndexWriter w = new IndexWriter(dir, newIndexWriterConfig()); + w.addDocument(new Document()); + DirectoryReader r = DirectoryReader.open(w); + w.close(); + ShardId shard = new ShardId("index", "_na_", 0); + r = OpenSearchDirectoryReader.wrap(r, shard); + IndexSearcher s = new IndexSearcher(r); + s.setQueryCachingPolicy(alwaysCachePolicy()); + + TieredQueryCache cache = getQueryCache(getTSCSettings(1000)); + s.setQueryCache(cache); + + ICache innerCache = cache.getInnerCache(); + assertTrue(innerCache instanceof TieredSpilloverCache); + + testBasicsDummyQuery(cache, s, shard); + + cache.close(); + IOUtils.close(r, dir); + } + + private void testBasicsDummyQuery(TieredQueryCache cache, IndexSearcher s, ShardId shard) throws IOException { QueryCacheStats stats = cache.getStats(shard); assertEquals(0L, stats.getCacheSize()); assertEquals(0L, stats.getCacheCount()); @@ -89,6 +163,8 @@ public void testBasics_WithOpenSearchOnHeapCache() throws IOException { assertEquals(1L, stats.getCacheCount()); assertEquals(0L, stats.getHitCount()); assertEquals(2L, stats.getMissCount()); // TODO: Why is there 2x misses per s.count()? + // TODO: returning 3 not 2. 1 from TSC and 2 from outer misses. Doesnt seem to be a TSC stats holder problem. + // In original, there were 2 from outer misses (this is as we expect - confirmed it all looks right at end) and 0 from inner misses. assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); int numEntries = 3; @@ -113,47 +189,58 @@ public void testBasics_WithOpenSearchOnHeapCache() throws IOException { assertEquals(1L, stats.getHitCount()); assertEquals(2 * numEntries, stats.getMissCount()); assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); - - // TODO: not implementing shard closing logic for the PoC - /*IOUtils.close(r, dir); - - // got emptied, but no changes to other metrics - stats = cache.getStats(shard); - assertEquals(0L, stats.getCacheSize()); - assertEquals(20L, stats.getCacheCount()); - assertEquals(1L, stats.getHitCount()); - assertEquals(40L, stats.getMissCount()); - assertTrue(stats.getMemorySizeInBytes() > 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE); - - cache.onClose(shard); - - // forgot everything - stats = cache.getStats(shard); - assertEquals(0L, stats.getCacheSize()); - assertEquals(0L, stats.getCacheCount()); - assertEquals(0L, stats.getHitCount()); - assertEquals(0L, stats.getMissCount()); - assertTrue(stats.getMemorySizeInBytes() >= 0L && stats.getMemorySizeInBytes() < Long.MAX_VALUE);*/ - - cache.close(); // this triggers some assertions - IOUtils.close(r, dir); - } - - public void testBasics_WithTSC_WithSmallHeapSize() throws Exception { - // TODO: Check all the logic works when TSC is innerCache and can only fit a few keys into its heap tier (aka test the serializers - // work.) } public void testBasicsWithLongPointRangeQuery() throws Exception { // TODO } + private Settings getTSCSettings(int heapBytes) { + return Settings.builder() + .put( + CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_QUERY_CACHE).getKey(), + TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_QUERY_CACHE.getSettingPrefix() + ).getKey(), + OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace( + CacheType.INDICES_QUERY_CACHE.getSettingPrefix() + ).getKey(), + MockDiskCache.MockDiskCacheFactory.NAME + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace( + CacheType.INDICES_QUERY_CACHE.getSettingPrefix() + ).getKey(), + heapBytes + "b" + ) + .put( + TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace( + CacheType.INDICES_QUERY_CACHE.getSettingPrefix() + ).getKey(), + 1 + ) + .put(FeatureFlags.PLUGGABLE_CACHE, "true") + .build(); + + } + private TieredQueryCache getQueryCache(Settings settings) throws IOException { try (NodeEnvironment env = newNodeEnvironment(settings)) { + ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); + clusterService.getClusterSettings().registerSetting(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_QUERY_CACHE)); + /*clusterService.getClusterSettings().put( + Settings.builder().put(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_QUERY_CACHE).getKey(), true).build() + );*/ // TODO: This isn't actually applying anything to the clusterService... return new TieredQueryCache( - new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(), + new CacheModule(List.of(new TieredSpilloverCachePlugin(settings), new MockDiskCachePlugin()), settings).getCacheService(), settings, - ClusterServiceUtils.createClusterService(threadPool), + clusterService, env ); } @@ -174,4 +261,20 @@ public boolean shouldCache(Query query) { } }; } + + // Duplicated from TieredSpilloverCacheIT.java + public static class MockDiskCachePlugin extends Plugin implements CachePlugin { + + public MockDiskCachePlugin() {} + + @Override + public Map getCacheFactoryMap() { + return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false, 1)); + } + + @Override + public String getName() { + return "mock_disk_plugin"; + } + } } diff --git a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java index fcddd489a27aa..3b6a1f5fd0e96 100644 --- a/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java +++ b/modules/cache-common/src/test/java/org/opensearch/cache/common/tier/MockDiskCache.java @@ -23,6 +23,7 @@ import org.opensearch.common.cache.store.builders.ICacheBuilder; import org.opensearch.common.cache.store.config.CacheConfig; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map;