sashapolo commented on code in PR #7513: URL: https://github.com/apache/ignite-3/pull/7513#discussion_r2773634369
########## modules/page-memory/src/jmh/java/org/apache/ignite/internal/pagememory/benchmark/PageReplacementBenchmark.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.benchmark; + +import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; + +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo; +import org.apache.ignite.internal.pagememory.configuration.ReplacementMode; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState; +import org.apache.ignite.internal.util.Constants; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.infra.ThreadParams; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmark for page cache replacement policies (CLOCK, SEGMENTED_LRU, RANDOM_LRU). + * + * <p>Tests how well each policy handles page evictions when the working set is larger than + * available memory. Uses a realistic access pattern where some pages are hot (frequently accessed) + * and others are cold. + * + * <p>Limitations: read-only workload, checkpoint lock held during measurements, cache is pre-warmed. + */ +@Warmup(iterations = 5, time = 2) +@Measurement(iterations = 10, time = 2) +@Fork(1) +@State(Scope.Benchmark) +public class PageReplacementBenchmark extends PersistentPageMemoryBenchmarkBase { + private static final long SMALL_REGION_SIZE = 20L * 1024 * 1024; + private static final int PAGE_SIZE = 4 * Constants.KiB; + private static final long REGION_CAPACITY_PAGES = SMALL_REGION_SIZE / PAGE_SIZE; + private static final long MAX_WORKING_SET_SIZE = 1_000_000; + + /** 0.99 = very skewed, most accesses hit few pages. */ + private static final double ZIPFIAN_SKEW = 0.99; + + private static final long BASE_SEED = 42L; + + /** Different seed for warmup to avoid biasing results. */ + private static final long WARMUP_SEED = BASE_SEED + 999999L; + + private static final int PARTITION_COUNT = 16; + + private static final double WARMUP_MULTIPLIER = 1.1; + + private static final double MIN_WORKING_SET_RATIO = 0.1; + + private static final int CHECKPOINT_TIMEOUT_SECONDS = 30; + + @Param({"CLOCK", "SEGMENTED_LRU", "RANDOM_LRU"}) + public ReplacementMode replacementModeParam; + + @Param({"LOW", "MEDIUM", "HIGH"}) + public CachePressure cachePressure; + + private long[] pageIds; + + private int workingSetSize; + + private volatile MetricsSnapshot beforeMetrics; + + // Use same seed across runs so all policies get the same partition distribution. + private final Random partitionRandom = new Random(BASE_SEED); + + /** How much bigger the working set is compared to cache size. */ + public enum CachePressure { + LOW(1.2), + MEDIUM(2.0), + HIGH(4.0); + + private final double multiplier; + + CachePressure(double multiplier) { + this.multiplier = multiplier; + } + + int computeWorkingSetSize(long capacity) { + long result = Math.round(capacity * multiplier); + assert result <= Integer.MAX_VALUE : "Working set too large: " + result; + return (int) result; + } + } + + /** Each thread has its own access pattern. */ + @State(Scope.Thread) + public static class ThreadState { + private ZipfianDistribution zipfianDistribution; + private boolean checkpointLockAcquired; + private int threadIndex; + private PageReplacementBenchmark benchmark; + + /** Setup trial. */ + @Setup(Level.Trial) + public void setupTrial(ThreadParams threadParams, PageReplacementBenchmark benchmark) { + this.benchmark = benchmark; + this.threadIndex = threadParams.getThreadIndex(); + + long minWorkingSetSize = Math.round(REGION_CAPACITY_PAGES * MIN_WORKING_SET_RATIO); Review Comment: Why is this check needed on each trial? This is a constant. And we can check it where we generate the working set ########## modules/page-memory/src/jmh/java/org/apache/ignite/internal/pagememory/benchmark/PageReplacementBenchmark.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.benchmark; + +import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; + +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo; +import org.apache.ignite.internal.pagememory.configuration.ReplacementMode; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState; +import org.apache.ignite.internal.util.Constants; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.infra.ThreadParams; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmark for page cache replacement policies (CLOCK, SEGMENTED_LRU, RANDOM_LRU). + * + * <p>Tests how well each policy handles page evictions when the working set is larger than + * available memory. Uses a realistic access pattern where some pages are hot (frequently accessed) + * and others are cold. + * + * <p>Limitations: read-only workload, checkpoint lock held during measurements, cache is pre-warmed. + */ +@Warmup(iterations = 5, time = 2) +@Measurement(iterations = 10, time = 2) +@Fork(1) +@State(Scope.Benchmark) +public class PageReplacementBenchmark extends PersistentPageMemoryBenchmarkBase { + private static final long SMALL_REGION_SIZE = 20L * 1024 * 1024; + private static final int PAGE_SIZE = 4 * Constants.KiB; + private static final long REGION_CAPACITY_PAGES = SMALL_REGION_SIZE / PAGE_SIZE; + private static final long MAX_WORKING_SET_SIZE = 1_000_000; + + /** 0.99 = very skewed, most accesses hit few pages. */ + private static final double ZIPFIAN_SKEW = 0.99; + + private static final long BASE_SEED = 42L; + + /** Different seed for warmup to avoid biasing results. */ + private static final long WARMUP_SEED = BASE_SEED + 999999L; + + private static final int PARTITION_COUNT = 16; + + private static final double WARMUP_MULTIPLIER = 1.1; + + private static final double MIN_WORKING_SET_RATIO = 0.1; + + private static final int CHECKPOINT_TIMEOUT_SECONDS = 30; + + @Param({"CLOCK", "SEGMENTED_LRU", "RANDOM_LRU"}) + public ReplacementMode replacementModeParam; + + @Param({"LOW", "MEDIUM", "HIGH"}) + public CachePressure cachePressure; + + private long[] pageIds; + + private int workingSetSize; + + private volatile MetricsSnapshot beforeMetrics; + + // Use same seed across runs so all policies get the same partition distribution. + private final Random partitionRandom = new Random(BASE_SEED); + + /** How much bigger the working set is compared to cache size. */ + public enum CachePressure { + LOW(1.2), + MEDIUM(2.0), + HIGH(4.0); + + private final double multiplier; + + CachePressure(double multiplier) { + this.multiplier = multiplier; + } + + int computeWorkingSetSize(long capacity) { + long result = Math.round(capacity * multiplier); + assert result <= Integer.MAX_VALUE : "Working set too large: " + result; + return (int) result; + } + } + + /** Each thread has its own access pattern. */ + @State(Scope.Thread) + public static class ThreadState { + private ZipfianDistribution zipfianDistribution; + private boolean checkpointLockAcquired; + private int threadIndex; + private PageReplacementBenchmark benchmark; + + /** Setup trial. */ + @Setup(Level.Trial) + public void setupTrial(ThreadParams threadParams, PageReplacementBenchmark benchmark) { + this.benchmark = benchmark; + this.threadIndex = threadParams.getThreadIndex(); + + long minWorkingSetSize = Math.round(REGION_CAPACITY_PAGES * MIN_WORKING_SET_RATIO); + if (benchmark.workingSetSize < minWorkingSetSize) { + throw new IllegalStateException(String.format( + "Benchmark not properly initialized: workingSetSize=%,d < minimum %,d (%.0f%% of capacity)", + benchmark.workingSetSize, minWorkingSetSize, MIN_WORKING_SET_RATIO * 100 + )); + } + + // Give each thread a different seed so they don't all access the same pages. + long threadSeed = BASE_SEED + (threadIndex * 1000003L); + + this.zipfianDistribution = new ZipfianDistribution( + benchmark.workingSetSize, + ZIPFIAN_SKEW, + threadSeed + ); + } + + /** + * Acquire checkpoint lock before iteration to keep it out of measurements. + */ + @Setup(Level.Iteration) + public void setupIteration() { + benchmark.checkpointManager().checkpointTimeoutLock().checkpointReadLock(); + checkpointLockAcquired = true; + } + + /** Release checkpoint lock. */ + @TearDown(Level.Iteration) + public void tearDownIteration() { + if (checkpointLockAcquired) { + benchmark.checkpointManager().checkpointTimeoutLock().checkpointReadUnlock(); + checkpointLockAcquired = false; + } + } + + int nextZipfianIndex() { + return zipfianDistribution.next(); + } + + int threadIndex() { + return threadIndex; + } + } + + private static final class MetricsSnapshot { + final long hits; + final long misses; + final long replacements; + + MetricsSnapshot(long hits, long misses, long replacements) { + this.hits = hits; + this.misses = misses; + this.replacements = replacements; + } + } + + /** Capture metrics before iteration. */ + @Setup(Level.Iteration) + public void setupIteration() { + beforeMetrics = captureMetrics(); + } + + /** Print metrics delta after iteration. */ + @TearDown(Level.Iteration) + public void tearDownIteration() { + MetricsSnapshot afterMetrics = captureMetrics(); + printMetricsDelta(beforeMetrics, afterMetrics); + } + + private void printMetricsDelta(MetricsSnapshot before, MetricsSnapshot after) { + long hits = after.hits - before.hits; + long misses = after.misses - before.misses; + long replacements = after.replacements - before.replacements; + + long total = hits + misses; + double hitRate = total > 0 ? (hits * 100.0 / total) : 0.0; + + System.out.printf("[%s/%s] Hits: %,d | Misses: %,d | Hit Rate: %.1f%% | Replacements: %,d%n", + replacementModeParam, cachePressure, hits, misses, hitRate, replacements); + } + + private MetricsSnapshot captureMetrics() { + return new MetricsSnapshot( + persistentPageMemory().metrics().cacheHits(), + persistentPageMemory().metrics().cacheMisses(), + persistentPageMemory().metrics().replacements() + ); + } + + @Override + protected Config config() { + return Config.builder() + .regionSize(SMALL_REGION_SIZE) + .pageSize(PAGE_SIZE) + .replacementMode(replacementModeParam) + .partitionsCount(PARTITION_COUNT) + .build(); + } + + /** + * Prepare benchmark infrastructure and data. + */ + @Setup + public void setup(Blackhole blackhole) throws Exception { + validateConfiguration(); + setup(); + prepareWorkingSet(); + + warmupCache(blackhole); + validateCacheWarmed(); + } + + private void validateConfiguration() { + workingSetSize = cachePressure.computeWorkingSetSize(REGION_CAPACITY_PAGES); + + if (workingSetSize > MAX_WORKING_SET_SIZE) { + throw new IllegalStateException(String.format( + "Working set too large: %,d pages (max: %,d). " + + "This would require %,d GB of actual page memory and %,d MB for pageId array. " + + "Reduce cache pressure or region size.", + workingSetSize, + MAX_WORKING_SET_SIZE, + ((long) workingSetSize * PAGE_SIZE) / (1024L * 1024 * 1024), + (workingSetSize * 8L) / (1024 * 1024) + )); + } + } + + /** + * Allocates pages in batches to avoid OOM. Checkpoints between batches to flush dirty pages. + */ + private void prepareWorkingSet() throws Exception { + pageIds = new long[workingSetSize]; + + // Allocate 80% of capacity at a time to avoid hitting dirty pages limit. + int batchSize = (int) Math.round(REGION_CAPACITY_PAGES * 0.8); + + TestSimpleValuePageIo pageIo = new TestSimpleValuePageIo(); + + for (int batchStart = 0; batchStart < workingSetSize; batchStart += batchSize) { + int batchEnd = Math.min(batchStart + batchSize, workingSetSize); + + checkpointManager().checkpointTimeoutLock().checkpointReadLock(); + try { + // Distribute pages randomly across partitions. + for (int i = batchStart; i < batchEnd; i++) { + int partitionId = partitionRandom.nextInt(PARTITION_COUNT); + pageIds[i] = persistentPageMemory().allocatePage(null, GROUP_ID, partitionId, FLAG_DATA); + } + + for (int i = batchStart; i < batchEnd; i++) { + long pageId = pageIds[i]; + writePage(pageId, pageIo); + } + } finally { + checkpointManager().checkpointTimeoutLock().checkpointReadUnlock(); + } + + if (batchEnd < workingSetSize) { + flushDirtyPages(); + } + } + + flushDirtyPages(); + } + + private void flushDirtyPages() throws Exception { + CheckpointProgress progress = checkpointManager().forceCheckpoint("Flush dirty pages after allocation"); + + try { + progress.futureFor(CheckpointState.FINISHED).get(CHECKPOINT_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (TimeoutException e) { + long dirtyPages = persistentPageMemory().dirtyPagesCount(); + int usedCpBuf = persistentPageMemory().usedCheckpointBufferPages(); + int maxCpBuf = persistentPageMemory().maxCheckpointBufferPages(); + + throw new IllegalStateException(String.format( + "Checkpoint timed out after %d seconds [reason='Flush dirty pages after allocation', " + + "dirtyPages=%,d, checkpointBuffer=%d/%d]. " + + "Check disk health and system logs.", + CHECKPOINT_TIMEOUT_SECONDS, dirtyPages, usedCpBuf, maxCpBuf + ), e); + } catch (ExecutionException e) { Review Comment: I'm pretty sure we can unify these two `catch` blocks ########## modules/page-memory/src/jmh/java/org/apache/ignite/internal/pagememory/benchmark/PageReplacementBenchmark.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.benchmark; + +import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; + +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo; +import org.apache.ignite.internal.pagememory.configuration.ReplacementMode; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState; +import org.apache.ignite.internal.util.Constants; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.infra.ThreadParams; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmark for page cache replacement policies (CLOCK, SEGMENTED_LRU, RANDOM_LRU). + * + * <p>Tests how well each policy handles page evictions when the working set is larger than + * available memory. Uses a realistic access pattern where some pages are hot (frequently accessed) + * and others are cold. + * + * <p>Limitations: read-only workload, checkpoint lock held during measurements, cache is pre-warmed. + */ +@Warmup(iterations = 5, time = 2) +@Measurement(iterations = 10, time = 2) +@Fork(1) +@State(Scope.Benchmark) +public class PageReplacementBenchmark extends PersistentPageMemoryBenchmarkBase { + private static final long SMALL_REGION_SIZE = 20L * 1024 * 1024; + private static final int PAGE_SIZE = 4 * Constants.KiB; Review Comment: I would suggest to make it explicitly equal to the default page size ########## modules/page-memory/src/jmh/java/org/apache/ignite/internal/pagememory/benchmark/PageReplacementBenchmark.java: ########## @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.benchmark; + +import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA; + +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo; +import org.apache.ignite.internal.pagememory.configuration.ReplacementMode; +import org.apache.ignite.internal.pagememory.io.PageIo; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress; +import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState; +import org.apache.ignite.internal.util.Constants; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.infra.ThreadParams; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmark for page cache replacement policies (CLOCK, SEGMENTED_LRU, RANDOM_LRU). + * + * <p>Tests how well each policy handles page evictions when the working set is larger than + * available memory. Uses a realistic access pattern where some pages are hot (frequently accessed) + * and others are cold. + * + * <p>Limitations: read-only workload, checkpoint lock held during measurements, cache is pre-warmed. + */ +@Warmup(iterations = 5, time = 2) +@Measurement(iterations = 10, time = 2) +@Fork(1) +@State(Scope.Benchmark) +public class PageReplacementBenchmark extends PersistentPageMemoryBenchmarkBase { + private static final long SMALL_REGION_SIZE = 20L * 1024 * 1024; + private static final int PAGE_SIZE = 4 * Constants.KiB; + private static final long REGION_CAPACITY_PAGES = SMALL_REGION_SIZE / PAGE_SIZE; + private static final long MAX_WORKING_SET_SIZE = 1_000_000; + + /** 0.99 = very skewed, most accesses hit few pages. */ + private static final double ZIPFIAN_SKEW = 0.99; + + private static final long BASE_SEED = 42L; + + /** Different seed for warmup to avoid biasing results. */ + private static final long WARMUP_SEED = BASE_SEED + 999999L; + + private static final int PARTITION_COUNT = 16; + + private static final double WARMUP_MULTIPLIER = 1.1; + + private static final double MIN_WORKING_SET_RATIO = 0.1; + + private static final int CHECKPOINT_TIMEOUT_SECONDS = 30; + + @Param({"CLOCK", "SEGMENTED_LRU", "RANDOM_LRU"}) + public ReplacementMode replacementModeParam; + + @Param({"LOW", "MEDIUM", "HIGH"}) + public CachePressure cachePressure; + + private long[] pageIds; + + private int workingSetSize; + + private volatile MetricsSnapshot beforeMetrics; + + // Use same seed across runs so all policies get the same partition distribution. + private final Random partitionRandom = new Random(BASE_SEED); + + /** How much bigger the working set is compared to cache size. */ + public enum CachePressure { + LOW(1.2), + MEDIUM(2.0), + HIGH(4.0); + + private final double multiplier; + + CachePressure(double multiplier) { + this.multiplier = multiplier; + } + + int computeWorkingSetSize(long capacity) { + long result = Math.round(capacity * multiplier); + assert result <= Integer.MAX_VALUE : "Working set too large: " + result; + return (int) result; + } + } + + /** Each thread has its own access pattern. */ + @State(Scope.Thread) + public static class ThreadState { + private ZipfianDistribution zipfianDistribution; + private boolean checkpointLockAcquired; + private int threadIndex; + private PageReplacementBenchmark benchmark; + + /** Setup trial. */ + @Setup(Level.Trial) + public void setupTrial(ThreadParams threadParams, PageReplacementBenchmark benchmark) { + this.benchmark = benchmark; + this.threadIndex = threadParams.getThreadIndex(); + + long minWorkingSetSize = Math.round(REGION_CAPACITY_PAGES * MIN_WORKING_SET_RATIO); + if (benchmark.workingSetSize < minWorkingSetSize) { + throw new IllegalStateException(String.format( + "Benchmark not properly initialized: workingSetSize=%,d < minimum %,d (%.0f%% of capacity)", + benchmark.workingSetSize, minWorkingSetSize, MIN_WORKING_SET_RATIO * 100 + )); + } + + // Give each thread a different seed so they don't all access the same pages. + long threadSeed = BASE_SEED + (threadIndex * 1000003L); + + this.zipfianDistribution = new ZipfianDistribution( + benchmark.workingSetSize, + ZIPFIAN_SKEW, + threadSeed + ); + } + + /** + * Acquire checkpoint lock before iteration to keep it out of measurements. + */ + @Setup(Level.Iteration) + public void setupIteration() { + benchmark.checkpointManager().checkpointTimeoutLock().checkpointReadLock(); + checkpointLockAcquired = true; + } + + /** Release checkpoint lock. */ + @TearDown(Level.Iteration) + public void tearDownIteration() { + if (checkpointLockAcquired) { + benchmark.checkpointManager().checkpointTimeoutLock().checkpointReadUnlock(); + checkpointLockAcquired = false; + } + } + + int nextZipfianIndex() { + return zipfianDistribution.next(); + } + + int threadIndex() { + return threadIndex; + } + } + + private static final class MetricsSnapshot { + final long hits; + final long misses; + final long replacements; + + MetricsSnapshot(long hits, long misses, long replacements) { + this.hits = hits; + this.misses = misses; + this.replacements = replacements; + } + } + + /** Capture metrics before iteration. */ + @Setup(Level.Iteration) + public void setupIteration() { + beforeMetrics = captureMetrics(); + } + + /** Print metrics delta after iteration. */ + @TearDown(Level.Iteration) + public void tearDownIteration() { + MetricsSnapshot afterMetrics = captureMetrics(); + printMetricsDelta(beforeMetrics, afterMetrics); + } + + private void printMetricsDelta(MetricsSnapshot before, MetricsSnapshot after) { + long hits = after.hits - before.hits; + long misses = after.misses - before.misses; + long replacements = after.replacements - before.replacements; + + long total = hits + misses; + double hitRate = total > 0 ? (hits * 100.0 / total) : 0.0; + + System.out.printf("[%s/%s] Hits: %,d | Misses: %,d | Hit Rate: %.1f%% | Replacements: %,d%n", + replacementModeParam, cachePressure, hits, misses, hitRate, replacements); + } + + private MetricsSnapshot captureMetrics() { + return new MetricsSnapshot( + persistentPageMemory().metrics().cacheHits(), + persistentPageMemory().metrics().cacheMisses(), + persistentPageMemory().metrics().replacements() + ); + } + + @Override + protected Config config() { + return Config.builder() + .regionSize(SMALL_REGION_SIZE) + .pageSize(PAGE_SIZE) + .replacementMode(replacementModeParam) + .partitionsCount(PARTITION_COUNT) + .build(); + } + + /** + * Prepare benchmark infrastructure and data. + */ + @Setup + public void setup(Blackhole blackhole) throws Exception { + validateConfiguration(); + setup(); + prepareWorkingSet(); + + warmupCache(blackhole); + validateCacheWarmed(); + } + + private void validateConfiguration() { + workingSetSize = cachePressure.computeWorkingSetSize(REGION_CAPACITY_PAGES); + + if (workingSetSize > MAX_WORKING_SET_SIZE) { Review Comment: I would suggest to move this check inside `computeWorkingSetSize`, this way we can get rid of this strange method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
