pivotal-jbarrett commented on a change in pull request #127: URL: https://github.com/apache/geode-benchmarks/pull/127#discussion_r616995076
########## File path: geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final String ID = UUID.randomUUID().toString(); + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(180); + config.durationSeconds(300); + config.warmupSeconds(30); + ClientServerTopology.configure(config); + config.before(new CreatePartitionedRegion(), SERVER); + config.before(new CreateClientPool(), CLIENT); + config.before(new CreateClientProxyRegionWithPool(), CLIENT); + config.workload(new PutWithDeltaTask(ID), CLIENT); + return config; + } + + public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { + + private Region<String, Session> region; + private final AtomicReference<String> sessionId = new AtomicReference<>(null); + + private final String id; + + private final List<String> existingSessionAttributeNames = + new ArrayList<>(); + + private final Random random = new Random(System.currentTimeMillis()); + + private Collection<Callable<Integer>> workloadTasks; + + public PutWithDeltaTask(String id) { + this.id = id; + } + + @Override + public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + this.workloadTasks = newSessionWorkloadTasks(); + ClientCache cache = ClientCacheFactory.getAnyInstance(); + cache.setCopyOnRead(true); + region = cache.getRegion("region"); + save(Session.create(id)); + this.sessionId.set(id); + } + + @Override + public boolean test(Map<Object, Object> ctx) throws Exception { + int sessionAttributeCount = runSessionWorkload(); + + assert sessionAttributeCount == this.existingSessionAttributeNames.size(); + Session session = findById(this.sessionId.get()); + + return true; + } + + private Session findById(String id) { + return Optional.ofNullable(this.region.get(id)) + .map(Session::commit) + .map(Session::touch) + .orElseThrow(() -> new IllegalStateException( + String.format("No Session with ID [%s] was found", id))); + } + + private Session save(Session session) { + if (session != null && session.hasDelta()) { + this.region.put(session.getId(), session); + session.commit(); + } + + return session; + } + + private int runSessionWorkload() { + return workloadTasks.stream().mapToInt(integerCallable -> { + try { + return integerCallable.call(); + } catch (Exception e) { + e.printStackTrace(); + } + return 0; + }).sum(); + } + + private Collection<Callable<Integer>> newSessionWorkloadTasks() { + + Collection<Callable<Integer>> sessionWorkloadTasks = new LinkedList<>(); + + sessionWorkloadTasks.add(newAddSessionAttributeTask()); + sessionWorkloadTasks.add(newRemoveSessionAttributeTask()); + sessionWorkloadTasks.add(newSessionReaderTask()); + return sessionWorkloadTasks; + } + + private Callable<Integer> newAddSessionAttributeTask() { + return () -> { + Session session = findById(this.sessionId.get()); + + String name = UUID.randomUUID().toString(); + + session.setAttribute(name, System.currentTimeMillis()); + save(session); + + this.existingSessionAttributeNames.add(name); + return 1; + }; + } + + private Callable<Integer> newRemoveSessionAttributeTask() { + return () -> { + int returnValue = 0; + + Session session = findById(this.sessionId.get()); + String attributeName = null; + + int size = this.existingSessionAttributeNames.size(); + if (size > 0) { + int index = this.random.nextInt(size); + attributeName = this.existingSessionAttributeNames.remove(index); + } + + if (session.getAttributeNames().contains(attributeName)) { + session.removeAttribute(attributeName); + returnValue = -1; + } else { + Optional.ofNullable(attributeName) + .filter(it -> !it.trim().isEmpty()) + .ifPresent(this.existingSessionAttributeNames::add); + } + + save(session); + + return returnValue; + }; + } + + private Callable<Integer> newSessionReaderTask() { + return () -> { + Session session = findById(this.sessionId.get()); + save(session.touch()); + + return 0; + }; + } + + private int safeFutureGet(Future<Integer> future) { + try { + return future.get(); + } catch (Exception cause) { + throw new RuntimeException("Session access task failure", cause); + } + } + } + + private class CreateClientPool implements Task { Review comment: May want to consider extending the existing `CreateClient` `Task`. ########## File path: geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final String ID = UUID.randomUUID().toString(); + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(180); + config.durationSeconds(300); + config.warmupSeconds(30); + ClientServerTopology.configure(config); + config.before(new CreatePartitionedRegion(), SERVER); + config.before(new CreateClientPool(), CLIENT); + config.before(new CreateClientProxyRegionWithPool(), CLIENT); + config.workload(new PutWithDeltaTask(ID), CLIENT); + return config; + } + + public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { + + private Region<String, Session> region; + private final AtomicReference<String> sessionId = new AtomicReference<>(null); + + private final String id; + + private final List<String> existingSessionAttributeNames = + new ArrayList<>(); + + private final Random random = new Random(System.currentTimeMillis()); + + private Collection<Callable<Integer>> workloadTasks; + + public PutWithDeltaTask(String id) { + this.id = id; + } + + @Override + public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + this.workloadTasks = newSessionWorkloadTasks(); + ClientCache cache = ClientCacheFactory.getAnyInstance(); + cache.setCopyOnRead(true); + region = cache.getRegion("region"); + save(Session.create(id)); + this.sessionId.set(id); + } + + @Override + public boolean test(Map<Object, Object> ctx) throws Exception { + int sessionAttributeCount = runSessionWorkload(); + + assert sessionAttributeCount == this.existingSessionAttributeNames.size(); + Session session = findById(this.sessionId.get()); + + return true; + } + + private Session findById(String id) { + return Optional.ofNullable(this.region.get(id)) + .map(Session::commit) + .map(Session::touch) + .orElseThrow(() -> new IllegalStateException( + String.format("No Session with ID [%s] was found", id))); + } + + private Session save(Session session) { + if (session != null && session.hasDelta()) { + this.region.put(session.getId(), session); + session.commit(); + } + + return session; + } + + private int runSessionWorkload() { + return workloadTasks.stream().mapToInt(integerCallable -> { + try { + return integerCallable.call(); + } catch (Exception e) { + e.printStackTrace(); + } + return 0; + }).sum(); + } + + private Collection<Callable<Integer>> newSessionWorkloadTasks() { + + Collection<Callable<Integer>> sessionWorkloadTasks = new LinkedList<>(); + + sessionWorkloadTasks.add(newAddSessionAttributeTask()); + sessionWorkloadTasks.add(newRemoveSessionAttributeTask()); + sessionWorkloadTasks.add(newSessionReaderTask()); + return sessionWorkloadTasks; + } + + private Callable<Integer> newAddSessionAttributeTask() { + return () -> { + Session session = findById(this.sessionId.get()); + + String name = UUID.randomUUID().toString(); + + session.setAttribute(name, System.currentTimeMillis()); + save(session); + + this.existingSessionAttributeNames.add(name); + return 1; + }; + } + + private Callable<Integer> newRemoveSessionAttributeTask() { + return () -> { + int returnValue = 0; + + Session session = findById(this.sessionId.get()); + String attributeName = null; + + int size = this.existingSessionAttributeNames.size(); + if (size > 0) { + int index = this.random.nextInt(size); + attributeName = this.existingSessionAttributeNames.remove(index); + } + + if (session.getAttributeNames().contains(attributeName)) { + session.removeAttribute(attributeName); + returnValue = -1; + } else { + Optional.ofNullable(attributeName) + .filter(it -> !it.trim().isEmpty()) + .ifPresent(this.existingSessionAttributeNames::add); + } + + save(session); + + return returnValue; + }; + } + + private Callable<Integer> newSessionReaderTask() { + return () -> { + Session session = findById(this.sessionId.get()); + save(session.touch()); + + return 0; + }; + } + + private int safeFutureGet(Future<Integer> future) { + try { + return future.get(); + } catch (Exception cause) { + throw new RuntimeException("Session access task failure", cause); + } + } + } + + private class CreateClientPool implements Task { + + @Override + public void run(TestContext context) throws Exception { + PoolFactory poolFactory = PoolManager.createFactory(); + poolFactory + .addLocator("localhost", 10334) Review comment: See other tests on how to get the locator and port. This falsely assumes the locator is localhost but when running on multiple hosts this may not be the case. ########## File path: geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final String ID = UUID.randomUUID().toString(); + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(180); + config.durationSeconds(300); + config.warmupSeconds(30); + ClientServerTopology.configure(config); + config.before(new CreatePartitionedRegion(), SERVER); + config.before(new CreateClientPool(), CLIENT); + config.before(new CreateClientProxyRegionWithPool(), CLIENT); + config.workload(new PutWithDeltaTask(ID), CLIENT); + return config; + } + + public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { + + private Region<String, Session> region; + private final AtomicReference<String> sessionId = new AtomicReference<>(null); + + private final String id; + + private final List<String> existingSessionAttributeNames = + new ArrayList<>(); + + private final Random random = new Random(System.currentTimeMillis()); + + private Collection<Callable<Integer>> workloadTasks; + + public PutWithDeltaTask(String id) { + this.id = id; + } + + @Override + public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + this.workloadTasks = newSessionWorkloadTasks(); + ClientCache cache = ClientCacheFactory.getAnyInstance(); + cache.setCopyOnRead(true); + region = cache.getRegion("region"); + save(Session.create(id)); + this.sessionId.set(id); + } + + @Override + public boolean test(Map<Object, Object> ctx) throws Exception { + int sessionAttributeCount = runSessionWorkload(); + + assert sessionAttributeCount == this.existingSessionAttributeNames.size(); Review comment: Leave `assert` out of here. We never run with asserts enabled. Write a unit test for this benchmark that asserts the behavior you expect. ########## File path: geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final String ID = UUID.randomUUID().toString(); + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(180); + config.durationSeconds(300); + config.warmupSeconds(30); + ClientServerTopology.configure(config); + config.before(new CreatePartitionedRegion(), SERVER); + config.before(new CreateClientPool(), CLIENT); + config.before(new CreateClientProxyRegionWithPool(), CLIENT); + config.workload(new PutWithDeltaTask(ID), CLIENT); + return config; + } + + public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { + + private Region<String, Session> region; + private final AtomicReference<String> sessionId = new AtomicReference<>(null); + + private final String id; + + private final List<String> existingSessionAttributeNames = + new ArrayList<>(); + + private final Random random = new Random(System.currentTimeMillis()); + + private Collection<Callable<Integer>> workloadTasks; + + public PutWithDeltaTask(String id) { + this.id = id; + } + + @Override + public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + this.workloadTasks = newSessionWorkloadTasks(); + ClientCache cache = ClientCacheFactory.getAnyInstance(); + cache.setCopyOnRead(true); + region = cache.getRegion("region"); + save(Session.create(id)); + this.sessionId.set(id); + } + + @Override + public boolean test(Map<Object, Object> ctx) throws Exception { + int sessionAttributeCount = runSessionWorkload(); + + assert sessionAttributeCount == this.existingSessionAttributeNames.size(); + Session session = findById(this.sessionId.get()); + + return true; + } + + private Session findById(String id) { + return Optional.ofNullable(this.region.get(id)) + .map(Session::commit) + .map(Session::touch) + .orElseThrow(() -> new IllegalStateException( + String.format("No Session with ID [%s] was found", id))); + } + + private Session save(Session session) { + if (session != null && session.hasDelta()) { + this.region.put(session.getId(), session); + session.commit(); + } + + return session; + } + + private int runSessionWorkload() { + return workloadTasks.stream().mapToInt(integerCallable -> { + try { + return integerCallable.call(); + } catch (Exception e) { + e.printStackTrace(); + } + return 0; + }).sum(); + } + + private Collection<Callable<Integer>> newSessionWorkloadTasks() { + + Collection<Callable<Integer>> sessionWorkloadTasks = new LinkedList<>(); + + sessionWorkloadTasks.add(newAddSessionAttributeTask()); + sessionWorkloadTasks.add(newRemoveSessionAttributeTask()); + sessionWorkloadTasks.add(newSessionReaderTask()); + return sessionWorkloadTasks; + } + + private Callable<Integer> newAddSessionAttributeTask() { + return () -> { + Session session = findById(this.sessionId.get()); + + String name = UUID.randomUUID().toString(); Review comment: `UUID.randomUUID()` utilizes secure random to generate an array of bytes, which is not only more computationally more complex it also suffered similar contention issues that `Random` has. Also keeping in mind that those bytes are transient allocations and have impact on the GC and benchmark measurements. If UUID is desired then you may want to construct UUIDs byte fetching 2 longs from `ThreadLocalRandom`. ########## File path: geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final String ID = UUID.randomUUID().toString(); + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(180); + config.durationSeconds(300); + config.warmupSeconds(30); + ClientServerTopology.configure(config); + config.before(new CreatePartitionedRegion(), SERVER); + config.before(new CreateClientPool(), CLIENT); + config.before(new CreateClientProxyRegionWithPool(), CLIENT); + config.workload(new PutWithDeltaTask(ID), CLIENT); + return config; + } + + public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { + + private Region<String, Session> region; + private final AtomicReference<String> sessionId = new AtomicReference<>(null); + + private final String id; + + private final List<String> existingSessionAttributeNames = + new ArrayList<>(); + + private final Random random = new Random(System.currentTimeMillis()); Review comment: Random isn't re-entrant and can be a bottleneck if 180 threads are trying to access it. Consider `ThreadLocalRandom`. ########## File path: geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final String ID = UUID.randomUUID().toString(); + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(180); Review comment: See the tests where these are not hard coded. ########## File path: geode-benchmarks/src/main/java/org/apache/geode/benchmark/tests/PartitionedWithDeltaAndUniqueObjectReferenceBenchmark.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.benchmark.tests; + +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.CLIENT; +import static org.apache.geode.benchmark.topology.ClientServerTopology.Roles.SERVER; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import benchmark.geode.data.Session; +import org.junit.jupiter.api.Test; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkDriverAdapter; + +import org.apache.geode.benchmark.tasks.CreatePartitionedRegion; +import org.apache.geode.benchmark.topology.ClientServerTopology; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.perftest.PerformanceTest; +import org.apache.geode.perftest.Task; +import org.apache.geode.perftest.TestConfig; +import org.apache.geode.perftest.TestContext; +import org.apache.geode.perftest.TestRunners; + +public class PartitionedWithDeltaAndUniqueObjectReferenceBenchmark implements PerformanceTest { + + private static final String SUBSCRIPTION_POOL = "subscriptionPool"; + private static final String ID = UUID.randomUUID().toString(); + + public PartitionedWithDeltaAndUniqueObjectReferenceBenchmark() {} + + @Test + public void run() throws Exception { + TestRunners.defaultRunner().runTest(this); + } + + @Override + public TestConfig configure() { + TestConfig config = GeodeBenchmark.createConfig(); + config.threads(180); + config.durationSeconds(300); + config.warmupSeconds(30); + ClientServerTopology.configure(config); + config.before(new CreatePartitionedRegion(), SERVER); + config.before(new CreateClientPool(), CLIENT); + config.before(new CreateClientProxyRegionWithPool(), CLIENT); + config.workload(new PutWithDeltaTask(ID), CLIENT); + return config; + } + + public static class PutWithDeltaTask extends BenchmarkDriverAdapter implements Serializable { + + private Region<String, Session> region; + private final AtomicReference<String> sessionId = new AtomicReference<>(null); + + private final String id; + + private final List<String> existingSessionAttributeNames = + new ArrayList<>(); + + private final Random random = new Random(System.currentTimeMillis()); + + private Collection<Callable<Integer>> workloadTasks; + + public PutWithDeltaTask(String id) { + this.id = id; + } + + @Override + public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + this.workloadTasks = newSessionWorkloadTasks(); + ClientCache cache = ClientCacheFactory.getAnyInstance(); + cache.setCopyOnRead(true); + region = cache.getRegion("region"); + save(Session.create(id)); + this.sessionId.set(id); + } + + @Override + public boolean test(Map<Object, Object> ctx) throws Exception { + int sessionAttributeCount = runSessionWorkload(); + + assert sessionAttributeCount == this.existingSessionAttributeNames.size(); + Session session = findById(this.sessionId.get()); + + return true; + } + + private Session findById(String id) { + return Optional.ofNullable(this.region.get(id)) + .map(Session::commit) + .map(Session::touch) + .orElseThrow(() -> new IllegalStateException( + String.format("No Session with ID [%s] was found", id))); + } + + private Session save(Session session) { + if (session != null && session.hasDelta()) { + this.region.put(session.getId(), session); + session.commit(); + } + + return session; + } + + private int runSessionWorkload() { Review comment: The workload is fixed and small so this seems like an overly complicated version of: ``` return workload1() + workload2() + workload3(); ``` We end up benchmarking the cost of streams and lambdas too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
