This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new fef9a4f GEODE-9961: GatewayReceiver rethrows CancelException (#7275) fef9a4f is described below commit fef9a4ff58182faaaa50bfd7b009ed31101f62d4 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Wed Jan 19 13:46:23 2022 +0100 GEODE-9961: GatewayReceiver rethrows CancelException (#7275) --- .../sockets/command/GatewayReceiverCommand.java | 4 + .../command/GatewayReceiverCommandTest.java | 168 +++++++++++++++++++++ 2 files changed, 172 insertions(+) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java index 5df0ebb..5ebd08f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommand.java @@ -715,6 +715,10 @@ public class GatewayReceiverCommand extends BaseCommand { private void handleException(boolean removeOnException, GatewayReceiverStats stats, Exception e) throws Exception { + if (e instanceof CancelException) { + throw e; + } + if (shouldThrowException(removeOnException)) { throw e; } else { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java new file mode 100644 index 0000000..e1272b0 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/GatewayReceiverCommandTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets.command; + + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.tier.CachedRegionHelper; +import org.apache.geode.internal.cache.tier.sockets.Message; +import org.apache.geode.internal.cache.tier.sockets.Part; +import org.apache.geode.internal.cache.tier.sockets.ServerConnection; +import org.apache.geode.internal.cache.wan.GatewayReceiverStats; +import org.apache.geode.internal.security.SecurityService; +import org.apache.geode.test.junit.categories.ClientServerTest; + +@Category({ClientServerTest.class}) +public class GatewayReceiverCommandTest { + + private static final String REGION_NAME = "region1"; + private static final String KEY = "key1"; + private static final Object VALUE = "value1"; + + private static final byte[] REMOVE_ON_EXCEPTION_BYTES = new byte[] {0}; + private static final byte[] POSSIBLE_DUPLICATE_BYTES = new byte[] {1}; + private static final byte[] CALLBACK_ARG_EXIST_BYTES = new byte[] {0}; + + @Mock + private EventID eventId; + + @Mock + private Message message; + @Mock + private SecurityService securityService; + @Mock + private ServerConnection serverConnection; + @Mock + private CachedRegionHelper cachedRegionHelper; + @Mock + private GatewayReceiverStats gatewayReceiverStats; + @Mock + private InternalCache cache; + + + @Mock + private Part numberOfEventsPart; + @Mock + private Part batchIdPart; + @Mock + private Part dsidPart; + @Mock + private Part removeOnExceptionPart; + @Mock + private Part actionTypePart; + @Mock + private Part possibleDuplicatePart; + @Mock + private Part regionNamePart; + @Mock + private Part eventIdPart; + @Mock + private Part keyPart; + @Mock + private Part valuePart; + @Mock + private Part callbackArgExistsPart; + @Mock + private Part versionTimeStampPart; + + private GatewayReceiverCommand gatewayReceiverCommand; + + @Before + public void setUp() throws Exception { + gatewayReceiverCommand = (GatewayReceiverCommand) GatewayReceiverCommand.getCommand(); + MockitoAnnotations.openMocks(this); + + when(serverConnection.getCachedRegionHelper()).thenReturn(cachedRegionHelper); + when(serverConnection.getCacheServerStats()).thenReturn(gatewayReceiverStats); + when(serverConnection.getLatestBatchIdReplied()).thenReturn(0); + + when(cachedRegionHelper.getCacheForGatewayCommand()).thenReturn(cache); + + when(numberOfEventsPart.getInt()).thenReturn(1); + when(batchIdPart.getInt()).thenReturn(1); + when(dsidPart.getInt()).thenReturn(1); + when(removeOnExceptionPart.getSerializedForm()).thenReturn(REMOVE_ON_EXCEPTION_BYTES); + + when(possibleDuplicatePart.getObject()).thenReturn(POSSIBLE_DUPLICATE_BYTES); + when(regionNamePart.getCachedString()).thenReturn(REGION_NAME); + when(eventIdPart.getObject()).thenReturn(eventId); + when(keyPart.getStringOrObject()).thenReturn(KEY); + when(valuePart.getStringOrObject()).thenReturn(VALUE); + when(callbackArgExistsPart.getObject()).thenReturn(CALLBACK_ARG_EXIST_BYTES); + when(versionTimeStampPart.getLong()).thenReturn(1l); + + when(message.getNumberOfParts()).thenReturn(12); + when(message.getPart(eq(0))).thenReturn(numberOfEventsPart); + when(message.getPart(eq(1))).thenReturn(batchIdPart); + when(message.getPart(eq(2))).thenReturn(dsidPart); + when(message.getPart(eq(3))).thenReturn(removeOnExceptionPart); + when(message.getPart(eq(4))).thenReturn(actionTypePart); + + when(message.getPart(eq(5))).thenReturn(possibleDuplicatePart); + when(message.getPart(eq(6))).thenReturn(regionNamePart); + when(message.getPart(eq(7))).thenReturn(eventIdPart); + when(message.getPart(eq(8))).thenReturn(keyPart); + when(message.getPart(eq(9))).thenReturn(valuePart); + when(message.getPart(eq(10))).thenReturn(callbackArgExistsPart); + when(message.getPart(eq(11))).thenReturn(versionTimeStampPart); + + } + + @Test + public void cacheClosedAtCreateEvent() throws Exception { + when(cache.getRegion(any())).thenThrow(CacheClosedException.class); + when(actionTypePart.getInt()).thenReturn(0); + + gatewayReceiverCommand.cmdExecute(message, serverConnection, securityService, 0); + verify(serverConnection).setFlagProcessMessagesAsFalse(); + } + + @Test + public void cacheClosedAtUpdateEvent() throws Exception { + when(cache.getRegion(any())).thenThrow(CacheClosedException.class); + when(actionTypePart.getInt()).thenReturn(1); + + gatewayReceiverCommand.cmdExecute(message, serverConnection, securityService, 0); + verify(serverConnection).setFlagProcessMessagesAsFalse(); + } + + @Test + public void cacheClosedAtDestroyEvent() throws Exception { + + when(message.getPart(eq(9))).thenReturn(callbackArgExistsPart); + when(message.getPart(eq(10))).thenReturn(versionTimeStampPart); + + when(cache.getRegion(any())).thenThrow(CacheClosedException.class); + when(actionTypePart.getInt()).thenReturn(2); + + gatewayReceiverCommand.cmdExecute(message, serverConnection, securityService, 0); + verify(serverConnection).setFlagProcessMessagesAsFalse(); + } + + +}