This is an automated email from the ASF dual-hosted git repository. mkevo pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new ac00f3c7e1 GEODE-10281: Fix WAN data inconsistency (#7665) ac00f3c7e1 is described below commit ac00f3c7e13ab7c4b5ed59af48381da97f6d1c0b Author: Jakov Varenina <62134331+jvaren...@users.noreply.github.com> AuthorDate: Thu Jul 7 14:15:38 2022 +0200 GEODE-10281: Fix WAN data inconsistency (#7665) --- .../internal/cache/wan/GatewaySenderEventImpl.java | 2 +- .../cache/wan/GatewaySenderEventImplTest.java | 87 ++++-- ...eplicateRegionWithSerialGwsDistributedTest.java | 333 +++++++++++++++++++++ 3 files changed, 393 insertions(+), 29 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index 494e499168..d18a9a5d68 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -853,7 +853,7 @@ public class GatewaySenderEventImpl // If the message is an update, it may be conflatable. If it is a // create, destroy, invalidate or destroy-region, it is not conflatable. // Only updates are conflated. - return isUpdate(); + return isUpdate() && !isConcurrencyConflict(); } @Override diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java index cec3e4b5a2..cf1f5d100e 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImplTest.java @@ -33,13 +33,15 @@ import java.io.DataInput; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.stream.Stream; -import junitparams.Parameters; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.MethodSource; import org.apache.geode.cache.Operation; import org.apache.geode.cache.TransactionId; @@ -61,18 +63,16 @@ import org.apache.geode.internal.serialization.VersionedDataInputStream; import org.apache.geode.internal.serialization.VersionedDataOutputStream; import org.apache.geode.internal.util.BlobHelper; import org.apache.geode.test.fake.Fakes; -import org.apache.geode.test.junit.runners.GeodeParamsRunner; -@RunWith(GeodeParamsRunner.class) public class GatewaySenderEventImplTest { private GemFireCacheImpl cache; - @Rule - public TestName testName = new TestName(); + private String testName; - @Before - public void setUpGemFire() { + @BeforeEach + public void setUpGemFire(TestInfo testInfo) { + testName = testInfo.getDisplayName(); createCache(); } @@ -110,8 +110,8 @@ public class GatewaySenderEventImplTest { assertThat(gatewaySenderEvent.getTransactionId()).isNotNull(); } - @Test - @Parameters(method = "getVersionsAndExpectedInvocations") + @ParameterizedTest + @MethodSource("getVersionsAndExpectedInvocations") public void testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpectedInvocations vaei) throws IOException { GatewaySenderEventImpl gatewaySenderEvent = spy(GatewaySenderEventImpl.class); @@ -129,8 +129,8 @@ public class GatewaySenderEventImplTest { any()); } - @Test - @Parameters(method = "getVersionsAndExpectedInvocations") + @ParameterizedTest + @MethodSource("getVersionsAndExpectedInvocations") public void testDeserializingDataFromOldVersionToCurrentVersion( VersionAndExpectedInvocations vaei) throws IOException, ClassNotFoundException { @@ -151,18 +151,17 @@ public class GatewaySenderEventImplTest { any()); } - private VersionAndExpectedInvocations[] getVersionsAndExpectedInvocations() { - return new VersionAndExpectedInvocations[] { - new VersionAndExpectedInvocations(GEODE_1_8_0, 1, 0, 0), - new VersionAndExpectedInvocations(GEODE_1_13_0, 1, 1, 0), - new VersionAndExpectedInvocations(GEODE_1_14_0, 1, 1, 1) - }; + private static Stream<Arguments> getVersionsAndExpectedInvocations() { + return Stream.of( + Arguments.of(new VersionAndExpectedInvocations(GEODE_1_8_0, 1, 0, 0)), + Arguments.of(new VersionAndExpectedInvocations(GEODE_1_13_0, 1, 1, 0)), + Arguments.of(new VersionAndExpectedInvocations(GEODE_1_14_0, 1, 1, 1))); } @Test public void testEquality() throws Exception { LocalRegion region = mock(LocalRegion.class); - when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region"); + when(region.getFullPath()).thenReturn(testName + "_region"); when(region.getCache()).thenReturn(cache); Object event = ParallelGatewaySenderHelper.createGatewaySenderEvent(region, Operation.CREATE, "key1", "value1", 0, 0, 0, 0); @@ -209,7 +208,7 @@ public class GatewaySenderEventImplTest { assertThat(event).isNotEqualTo(eventDifferentValue); LocalRegion region2 = mock(LocalRegion.class); - when(region2.getFullPath()).thenReturn(testName.getMethodName() + "_region2"); + when(region2.getFullPath()).thenReturn(testName + "_region2"); when(region2.getCache()).thenReturn(cache); Object eventDifferentRegion = ParallelGatewaySenderHelper.createGatewaySenderEvent(region2, Operation.CREATE, @@ -221,7 +220,7 @@ public class GatewaySenderEventImplTest { public void testSerialization() throws Exception { // Set up test LocalRegion region = mock(LocalRegion.class); - when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region"); + when(region.getFullPath()).thenReturn(testName + "_region"); when(region.getCache()).thenReturn(cache); TXId txId = new TXId(cache.getMyId(), 0); when(region.getTXId()).thenReturn(txId); @@ -348,12 +347,13 @@ public class GatewaySenderEventImplTest { return cacheEvent; } - @Parameters({"true, true", "true, false", "false, false"}) + @ParameterizedTest + @CsvSource({"true,true", "true,false", "false,false"}) public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean isGenerateCallbacks, boolean isCallbackArgumentNull) throws IOException { - InternalRegion region = mock(InternalRegion.class); - when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region"); + InternalRegion region = mock(LocalRegion.class); + when(region.getFullPath()).thenReturn(testName + "_region"); Operation operation = mock(Operation.class); when(operation.isLocalLoad()).thenReturn(true); @@ -377,6 +377,37 @@ public class GatewaySenderEventImplTest { assertThat(event.getAction()).isEqualTo(action); } + @Test + public void testShouldNotBeConflatedCreate() throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class)); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, INCLUDE); + + assertThat(gatewaySenderEvent.shouldBeConflated()).isFalse(); + } + + @Test + public void testShouldBeConflatedUpdate() throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class)); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, cacheEvent, null, INCLUDE); + + assertThat(gatewaySenderEvent.shouldBeConflated()).isTrue(); + } + + @Test + public void testShouldNotBeConflatedUpdateConcurrentConflict() throws IOException { + final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class)); + when(cacheEvent.isConcurrencyConflict()).thenReturn(true); + + final GatewaySenderEventImpl gatewaySenderEvent = + new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, cacheEvent, null, INCLUDE); + + assertThat(gatewaySenderEvent.shouldBeConflated()).isFalse(); + } + public static class VersionAndExpectedInvocations { private final KnownVersion version; diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java new file mode 100644 index 0000000000..c940ade012 --- /dev/null +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.serial; + +import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID; +import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; +import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy; +import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.CacheWriter; +import org.apache.geode.cache.CacheWriterException; +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionEvent; +import org.apache.geode.cache.wan.GatewayReceiver; +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.InternalRegion; +import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.management.internal.cli.util.CommandStringBuilder; +import org.apache.geode.management.internal.i18n.CliStrings; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.WanTest; +import org.apache.geode.test.junit.rules.GfshCommandRule; + +@Category({WanTest.class}) +public class InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest + implements Serializable { + + @Rule + public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9); + + @Rule + public transient GfshCommandRule gfsh = new GfshCommandRule(); + + public static boolean ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER; + + private MemberVM locator1Site2; + + private MemberVM server1Site1; + private MemberVM server2Site1; + + private MemberVM server1Site2; + private MemberVM server2Site2; + + private int server1Site2Port; + private int server2Site2Port; + + private ClientVM clientConnectedToServer1Site2; + private ClientVM clientConnectedToServer2Site2; + + private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1"; + private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2"; + private static final String REGION_NAME = "test1"; + + private static final String GATEWAY_SENDER_ID = "ln"; + + private final Map.Entry<Integer, Integer> ENTRY_INITIAL = new AbstractMap.SimpleEntry<>(1, 0); + private final Map.Entry<Integer, Integer> ENTRY_CONFLICT_RESOLUTION_WINNER = + new AbstractMap.SimpleEntry<>(1, 1); + private final Map.Entry<Integer, Integer> ENTRY_CONFLICT_RESOLUTION_LOSER = + new AbstractMap.SimpleEntry<>(1, 2); + + @Before + public void setupMultiSite() throws Exception { + Properties props = new Properties(); + props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1); + MemberVM locator1Site1 = clusterStartupRule.startLocatorVM(0, props); + MemberVM locator2Site1 = clusterStartupRule.startLocatorVM(1, props, locator1Site1.getPort()); + + // start servers for site #1 + server1Site1 = + clusterStartupRule.startServerVM(2, locator1Site1.getPort(), locator2Site1.getPort()); + server2Site1 = + clusterStartupRule.startServerVM(3, locator1Site1.getPort(), locator2Site1.getPort()); + connectGfshToSite(locator1Site1); + + // create partition region on site #1 + CommandStringBuilder regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION); + regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME); + regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "REPLICATE"); + + gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess(); + + String csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYRECEIVER) + .addOption(CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS, "localhost") + .getCommandString(); + + gfsh.executeAndAssertThat(csb).statusIsSuccess(); + + server1Site1.invoke( + InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::verifyReceiverState); + server2Site1.invoke( + InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::verifyReceiverState); + + props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE2); + props.setProperty(REMOTE_LOCATORS, + "localhost[" + locator1Site1.getPort() + "],localhost[" + locator2Site1.getPort() + "]"); + locator1Site2 = clusterStartupRule.startLocatorVM(5, props); + + // start servers for site #2 + server1Site2 = clusterStartupRule.startServerVM(6, locator1Site2.getPort()); + server2Site2 = clusterStartupRule.startServerVM(7, locator1Site2.getPort()); + + server2Site2Port = server2Site2.getPort(); + server1Site2Port = server1Site2.getPort(); + + // create gateway-sender on site #2 + connectGfshToSite(locator1Site2); + String command = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER) + .addOption(CliStrings.MEMBERS, server2Site2.getName()) + .addOption(CliStrings.CREATE_GATEWAYSENDER__ID, GATEWAY_SENDER_ID) + .addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "1") + .addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "false") + .addOption(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION, "true") + .getCommandString(); + gfsh.executeAndAssertThat(command).statusIsSuccess(); + + verifyGatewaySenderState(server2Site2, false); + + executeGatewaySenderActionCommandSite2(CliStrings.PAUSE_GATEWAYSENDER); + + // create partition region on site #2 + regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION); + regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME); + regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "REPLICATE"); + regionCmd.addOption(CliStrings.CREATE_REGION__GATEWAYSENDERID, GATEWAY_SENDER_ID); + gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess(); + } + + @Test + public void testEventIsNotConflatedWhenConcurrentModificationIsDetected() throws Exception { + startClientToServer1Site2(server1Site2Port); + startClientToServer2Site2(server2Site2Port); + + clientConnectedToServer2Site2.invoke(() -> executePutOperation(ENTRY_INITIAL)); + waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_INITIAL, server1Site2, server2Site2); + + // Configure cache writer on server to delay writing of entry in order to provoke + // the internal conflict + server1Site2.invoke(() -> { + InternalRegion region = + ClusterStartupRule.getCache().getInternalRegionByPath("/" + REGION_NAME); + region.getAttributesMutator().setCacheWriter(new TestCacheWriterDelayWritingOfEntry( + ENTRY_CONFLICT_RESOLUTION_WINNER, ENTRY_CONFLICT_RESOLUTION_LOSER)); + }); + + clientConnectedToServer2Site2.invokeAsync(() -> executePutOperation( + ENTRY_CONFLICT_RESOLUTION_WINNER)); + + server1Site2.invoke(() -> await().untilAsserted(() -> assertThat( + InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER) + .isTrue())); + + clientConnectedToServer1Site2.invokeAsync(() -> executePutOperation( + ENTRY_CONFLICT_RESOLUTION_LOSER)); + + // Check that expected entry has won the internal conflict resolution + waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_CONFLICT_RESOLUTION_WINNER, + server1Site2, + server2Site2); + + server2Site2.invoke( + InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest::awaitQueueSize); + executeGatewaySenderActionCommandSite2(CliStrings.RESUME_GATEWAYSENDER); + + // check that expected event is replicated to the remote cluster + waitUntilEventIsConsistentlyReplicatedAcrossServers(ENTRY_CONFLICT_RESOLUTION_WINNER, + server1Site1, + server2Site1); + } + + private void waitUntilEventIsConsistentlyReplicatedAcrossServers( + final Map.Entry<Integer, Integer> entry, + MemberVM... servers) { + await().untilAsserted(() -> isEventIsConsistentlyReplicatedAcrossServers(entry, servers)); + } + + private static void isEventIsConsistentlyReplicatedAcrossServers( + final Map.Entry<Integer, Integer> entry, + MemberVM... servers) { + for (MemberVM server : servers) { + assertThat(server.invoke(() -> doesEventExistOnServer(entry))).isTrue(); + } + } + + private static boolean doesEventExistOnServer(Map.Entry<Integer, Integer> entry) { + Region<Integer, Integer> region = + ClusterStartupRule.getCache().getRegion("/" + REGION_NAME); + return Objects.equals(region.get(entry.getKey()), entry.getValue()); + } + + private void executeGatewaySenderActionCommandSite2(final String action) throws Exception { + connectGfshToSite(locator1Site2); + CommandStringBuilder regionCmd = new CommandStringBuilder(action); + regionCmd.addOption(CliStrings.MEMBERS, server2Site2.getName()); + regionCmd.addOption(CliStrings.PAUSE_GATEWAYSENDER__ID, GATEWAY_SENDER_ID); + gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess(); + + verifyGatewaySenderState(server2Site2, CliStrings.PAUSE_GATEWAYSENDER.equals(action)); + } + + private void executePutOperation(Map.Entry<Integer, Integer> entry) { + Region<Integer, Integer> region = + ClusterStartupRule.clientCacheRule.getCache().getRegion(REGION_NAME); + region.put(entry.getKey(), entry.getValue()); + } + + private static void awaitQueueSize() { + await() + .untilAsserted(() -> validateQueueSize( + InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.GATEWAY_SENDER_ID, + 3)); + } + + private static void validateQueueSize(String senderId, int numQueueEntries) { + GatewaySender sender = ClusterStartupRule.getCache().getGatewaySender(senderId); + Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues(); + int size = 0; + for (RegionQueue q : queues) { + size += q.size(); + } + assertThat(size).isEqualTo(numQueueEntries); + } + + private static void verifyReceiverState() { + Set<GatewayReceiver> receivers = ClusterStartupRule.getCache().getGatewayReceivers(); + for (GatewayReceiver receiver : receivers) { + assertThat(receiver.isRunning()).isEqualTo(true); + } + } + + private void verifyGatewaySenderState(MemberVM memberVM, boolean isPaused) { + memberVM.invoke(() -> verifySenderState(GATEWAY_SENDER_ID, true, isPaused)); + locator1Site2.invoke( + () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()), GATEWAY_SENDER_ID, true, + isPaused)); + } + + private static InternalDistributedMember getMember(final VM vm) { + return vm.invoke(() -> ClusterStartupRule.getCache().getMyId()); + } + + private void startClientToServer1Site2(final int serverPort) throws Exception { + clientConnectedToServer1Site2 = + clusterStartupRule.startClientVM(8, c -> c.withServerConnection(serverPort)); + clientConnectedToServer1Site2.invoke(() -> { + ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME); + }); + } + + private void startClientToServer2Site2(final int serverPort) throws Exception { + clientConnectedToServer2Site2 = + clusterStartupRule.startClientVM(4, c -> c.withServerConnection(serverPort)); + clientConnectedToServer2Site2.invoke(() -> { + ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME); + }); + } + + private void connectGfshToSite(MemberVM locator) throws Exception { + if (gfsh.isConnected()) { + gfsh.disconnect(); + } + gfsh.connectAndVerify(locator); + } + + public static class TestCacheWriterDelayWritingOfEntry<K, V> implements CacheWriter<K, V> { + private final Map.Entry<Integer, Integer> entryToDelay; + + private final Map.Entry<Integer, Integer> waitUntilEntry; + + public TestCacheWriterDelayWritingOfEntry(Map.Entry<Integer, Integer> entryToDelay, + Map.Entry<Integer, Integer> waitUntilEntry) { + this.entryToDelay = entryToDelay; + this.waitUntilEntry = waitUntilEntry; + } + + @Override + public void beforeUpdate(EntryEvent<K, V> event) throws CacheWriterException { + Region<Integer, Integer> region = ClusterStartupRule.getCache().getRegion("/" + REGION_NAME); + int value = (Integer) event.getNewValue(); + int key = (Integer) event.getKey(); + if (key == entryToDelay.getKey() && value == entryToDelay.getValue()) { + InternalConflictResolutionReplicateRegionWithSerialGwsDistributedTest.ENTRY_CONFLICT_WINNER_HAS_REACHED_THE_REDUNDANT_SERVER = + true; + await().untilAsserted(() -> assertThat(region.get(waitUntilEntry.getKey())) + .isEqualTo(waitUntilEntry.getValue())); + } + } + + @Override + public void beforeCreate(EntryEvent<K, V> event) throws CacheWriterException {} + + @Override + public void beforeDestroy(EntryEvent<K, V> event) throws CacheWriterException {} + + @Override + public void beforeRegionDestroy(RegionEvent<K, V> event) throws CacheWriterException {} + + @Override + public void beforeRegionClear(RegionEvent<K, V> event) throws CacheWriterException {} + } +}