This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch support/1.12 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push: new 16f2093 GEODE-8259: when client singlehop getAll encountered SerializationException, it should retry (#5253) 16f2093 is described below commit 16f2093fa59b9f0f3b7600cc365d922d78d6e8dd Author: Xiaojian Zhou <gesterz...@users.noreply.github.com> AuthorDate: Tue Jun 30 15:19:39 2020 -0700 GEODE-8259: when client singlehop getAll encountered SerializationException, it should retry (#5253) Co-authored-by: Xiaojian Zhou <gz...@pivotal.io> Co-authored-by: Anil <aging...@pivotal.io> (cherry picked from commit ee9a4b05277ff531d0d89d5d0fb65f63063557e3) --- .../geode/cache/client/internal/GetAllOp.java | 38 +++++--- .../cache/client/internal/GetAllOpJUnitTest.java | 100 +++++++++++++++++++++ 2 files changed, 124 insertions(+), 14 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetAllOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetAllOp.java index 89c2251..c8c35a4 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetAllOp.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetAllOp.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.logging.log4j.Logger; +import org.apache.geode.SerializationException; import org.apache.geode.cache.Region; import org.apache.geode.cache.client.ServerConnectivityException; import org.apache.geode.cache.client.ServerOperationException; @@ -74,23 +75,32 @@ public class GetAllOp { VersionedObjectList result = null; ServerConnectivityException se = null; List retryList = new ArrayList(); - List callableTasks = - constructGetAllTasks(region.getFullPath(), serverToFilterMap, (PoolImpl) pool, callback); - Map<ServerLocation, Object> results = - SingleHopClientExecutor.submitGetAll(serverToFilterMap, - callableTasks, cms, (LocalRegion) region); - for (ServerLocation server : results.keySet()) { - Object serverResult = results.get(server); - if (serverResult instanceof ServerConnectivityException) { - se = (ServerConnectivityException) serverResult; - retryList.addAll(serverToFilterMap.get(server)); - } else { - if (result == null) { - result = (VersionedObjectList) serverResult; + try { + List callableTasks = + constructGetAllTasks(region.getFullPath(), serverToFilterMap, (PoolImpl) pool, + callback); + Map<ServerLocation, Object> results = + SingleHopClientExecutor.submitGetAll(serverToFilterMap, + callableTasks, cms, (LocalRegion) region); + for (ServerLocation server : results.keySet()) { + Object serverResult = results.get(server); + if (serverResult instanceof ServerConnectivityException) { + se = (ServerConnectivityException) serverResult; + retryList.addAll(serverToFilterMap.get(server)); } else { - result.addAll((VersionedObjectList) serverResult); + if (result == null) { + result = (VersionedObjectList) serverResult; + } else { + result.addAll((VersionedObjectList) serverResult); + } } } + } catch (ServerOperationException serverOperationException) { + if (!(serverOperationException.getCause() instanceof SerializationException)) { + throw serverOperationException; + } + se = serverOperationException; + retryList = keys; } if (se != null) { diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/GetAllOpJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/GetAllOpJUnitTest.java new file mode 100644 index 0000000..f905225 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/GetAllOpJUnitTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.client.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.geode.SerializationException; +import org.apache.geode.cache.client.ServerOperationException; +import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.execute.BucketMovedException; +import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; +import org.apache.geode.test.fake.Fakes; + +public class GetAllOpJUnitTest { + private ExecutablePool pool = mock(PoolImpl.class); + private GemFireCacheImpl cache = Fakes.cache(); + private LocalRegion region = mock(LocalRegion.class); + ArrayList<Integer> keys; + + @Before + public void setup() { + when(region.getCache()).thenReturn(cache); + ClientMetadataService cms = mock(ClientMetadataService.class); + when(cache.getClientMetadataService()).thenReturn(cms); + + keys = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + keys.add(i); + } + Map<ServerLocation, HashSet> serverToFilterMap = new HashMap<>(); + when(cms.getServerToFilterMap(keys, region, true)).thenReturn(serverToFilterMap); + ServerLocation serverLocation = new ServerLocation("localhost", 12345); + serverToFilterMap.put(serverLocation, new HashSet(keys)); + } + + @Test + public void singleHopGetAllShouldRetrySOECausedBySerialzationExp() { + when(region.getFullPath()).thenReturn("/testRegion") + .thenThrow(new ServerOperationException(new SerializationException("testRetry"))) + .thenReturn("/testRegion"); + VersionedObjectList vol = new VersionedObjectList(); + when(pool.execute(any())).thenReturn(vol); + VersionedObjectList result = GetAllOp.execute(pool, region, keys, -1, null); + assertThat(result.getKeys()).isEqualTo(keys); + Mockito.verify(pool, times(1)).execute(any()); + } + + @Test(expected = ServerOperationException.class) + public void singleHopGetAllShouldNotRetrySOENotCausedBySerialzationExp() { + when(region.getFullPath()).thenReturn("/testRegion") + .thenThrow(new ServerOperationException(new IOException("testRetry"))) + .thenReturn("/testRegion"); + VersionedObjectList vol = new VersionedObjectList(); + when(pool.execute(any())).thenReturn(vol); + VersionedObjectList result = GetAllOp.execute(pool, region, keys, -1, null); + assertThat(result).isNull(); + Mockito.verify(pool, times(0)).execute(any()); + } + + @Test(expected = BucketMovedException.class) + public void singleHopGetAllShouldNotRetryForExceptionOtherThanSOE() { + when(region.getFullPath()).thenReturn("/testRegion") + .thenThrow(new BucketMovedException("testRetry")) + .thenReturn("/testRegion"); + VersionedObjectList vol = new VersionedObjectList(); + when(pool.execute(any())).thenReturn(vol); + VersionedObjectList result = GetAllOp.execute(pool, region, keys, -1, null); + assertThat(result).isNull(); + Mockito.verify(pool, times(0)).execute(any()); + } + +}