This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 7d7a98b103 GEODE-10331: schedule delayed CloseEndpoint (#7849) 7d7a98b103 is described below commit 7d7a98b10355cb25985c031bfd2a67c77f1b6e43 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Mon Sep 19 08:17:42 2022 +0200 GEODE-10331: schedule delayed CloseEndpoint (#7849) * GEODE-10331: schedule delayed CloseEndpoint * GEODE-10331: added TCs --- .../distributed/internal/DistributionImpl.java | 20 +---- .../distributed/internal/direct/DirectChannel.java | 46 ++++++++++ .../distributed/internal/DistributionTest.java | 31 ++++--- .../internal/direct/DirectChannelTest.java | 100 +++++++++++++++++++++ 4 files changed, 168 insertions(+), 29 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java index fcba8e4c3e..68059bb898 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java @@ -74,7 +74,6 @@ import org.apache.geode.internal.serialization.SerializationContext; import org.apache.geode.internal.tcp.ConnectExceptions; import org.apache.geode.internal.tcp.ConnectionException; import org.apache.geode.internal.util.Breadcrumbs; -import org.apache.geode.logging.internal.executors.LoggingThread; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.security.AuthenticationRequiredException; import org.apache.geode.security.GemFireSecurityException; @@ -648,28 +647,13 @@ public class DistributionImpl implements Distribution { } } - private void destroyMember(final InternalDistributedMember member, final String reason) { + void destroyMember(final InternalDistributedMember member, final String reason) { final DirectChannel dc = directChannel; if (dc != null) { // Bug 37944: make sure this is always done in a separate thread, // so that shutdown conditions don't wedge the view lock // fix for bug 34010 - new LoggingThread("disconnect thread for " + member, () -> { - try { - Thread.sleep(Integer.getInteger("p2p.disconnectDelay", 3000)); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - // Keep going, try to close the endpoint. - } - if (!dc.isOpen()) { - return; - } - if (logger.isDebugEnabled()) { - logger.debug("Membership: closing connections for departed member {}", member); - } - // close connections, but don't do membership notification since it's already been done - dc.closeEndpoint(member, reason, false); - }).start(); + dc.scheduleCloseEndpoint(member, reason, false); } } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java index eaac79f2b8..55108741a5 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java @@ -24,6 +24,9 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.Logger; @@ -56,6 +59,7 @@ import org.apache.geode.internal.tcp.ConnectionException; import org.apache.geode.internal.tcp.MsgStreamer; import org.apache.geode.internal.tcp.TCPConduit; import org.apache.geode.internal.util.Breadcrumbs; +import org.apache.geode.logging.internal.executors.LoggingExecutors; import org.apache.geode.logging.internal.log4j.api.LogService; /** @@ -86,6 +90,11 @@ public class DirectChannel { InternalDistributedMember localAddr; + private ScheduledExecutorService closeEndpointExecutor; + + private final int CLOSE_ENDPOINT_POOL_SIZE = + Integer.getInteger("DirectChannel.CLOSE_ENDPOINT_POOL_SIZE", 1); + /** * Callback to set the local address, must be done before this channel is used. * @@ -147,6 +156,9 @@ public class DirectChannel { logger.info("GemFire P2P Listener started on {}", conduit.getSocketId()); + closeEndpointExecutor = LoggingExecutors.newScheduledThreadPool(CLOSE_ENDPOINT_POOL_SIZE, + "DirectChannel.closeEndpoint", false); + } catch (ConnectionException ce) { logger.fatal(String.format("Unable to initialize direct channel because: %s", ce.getMessage()), @@ -667,6 +679,7 @@ public class DirectChannel { public synchronized void disconnect(Exception cause) { disconnected = true; disconnectCompleted = false; + closeEndpointExecutor.shutdownNow(); conduit.stop(cause); disconnectCompleted = true; } @@ -765,4 +778,37 @@ public class DirectChannel { public boolean hasReceiversFor(DistributedMember mbr) { return conduit.hasReceiversFor(mbr); } + + public void scheduleCloseEndpoint(InternalDistributedMember member, String reason, + boolean notifyDisconnect) { + if (disconnected) { + return; + } + closeEndpointExecutor.schedule(new CloseEndpointRunnable(member, reason, notifyDisconnect), + Integer.getInteger("p2p.disconnectDelay", 3000), TimeUnit.MILLISECONDS); + } + + int getCloseEndpointExecutorQueueSize() { + ScheduledThreadPoolExecutor implementation = + (ScheduledThreadPoolExecutor) closeEndpointExecutor; + return implementation.getQueue().size(); + } + + public class CloseEndpointRunnable implements Runnable { + + protected final InternalDistributedMember member; + protected final String reason; + protected final boolean notifyDisconnect; + + public CloseEndpointRunnable(InternalDistributedMember member, String reason, boolean notify) { + this.member = member; + this.reason = reason; + this.notifyDisconnect = notify; + } + + @Override + public void run() { + closeEndpoint(member, reason, notifyDisconnect); + } + } } diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java index 029a5b99b8..f3844ce47b 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java @@ -15,13 +15,13 @@ package org.apache.geode.distributed.internal; import static org.apache.geode.distributed.internal.DistributionImpl.EMPTY_MEMBER_ARRAY; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -109,7 +109,7 @@ public class DistributionTest { m.setRecipients(recipients); Set<InternalDistributedMember> failures = distribution .directChannelSend(recipients, m); - assertTrue(failures == null); + assertThat(failures == null).isTrue(); verify(dc).send(any(), any(), any(), anyLong(), anyLong()); } @@ -126,9 +126,9 @@ public class DistributionTest { when(dc.send(any(), any(mockMembers.getClass()), any(DistributionMessage.class), anyLong(), anyLong())).thenThrow(exception); failures = distribution.directChannelSend(recipients, m); - assertTrue(failures != null); - assertEquals(1, failures.size()); - assertEquals(recipients.get(0), failures.iterator().next()); + assertThat(failures != null).isTrue(); + assertThat(failures).hasSize(1); + assertThat(failures.iterator().next()).isEqualTo(recipients.get(0)); } @Test @@ -154,10 +154,10 @@ public class DistributionTest { HighPriorityAckedMessage m = new HighPriorityAckedMessage(); when(membership.getAllMembers(EMPTY_MEMBER_ARRAY)).thenReturn(mockMembers); m.setRecipient(DistributionMessage.ALL_RECIPIENTS); - assertTrue(m.forAll()); + assertThat(m.forAll()).isTrue(); Set<InternalDistributedMember> failures = distribution .directChannelSend(null, m); - assertTrue(failures == null); + assertThat(failures == null).isTrue(); verify(dc).send(any(), isA(mockMembers.getClass()), isA(DistributionMessage.class), anyLong(), anyLong()); } @@ -188,8 +188,8 @@ public class DistributionTest { Set<InternalDistributedMember> failures = distribution.send(Collections.singletonList(mockMembers[0]), m); verify(membership, never()).send(any(), any()); - assertEquals(1, failures.size()); - assertEquals(mockMembers[0], failures.iterator().next()); + assertThat(failures).hasSize(1); + assertThat(failures.iterator().next()).isEqualTo(mockMembers[0]); } @Test @@ -228,4 +228,13 @@ public class DistributionTest { .isInstanceOf(SystemConnectException.class) .hasCause(exception); } + + @Test + public void testMemberDestroyed() throws Exception { + distribution.destroyMember(mockMembers[0], null); + distribution.destroyMember(mockMembers[1], null); + + verify(dc).scheduleCloseEndpoint(eq(mockMembers[0]), eq(null), eq(false)); + verify(dc).scheduleCloseEndpoint(eq(mockMembers[1]), eq(null), eq(false)); + } } diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java new file mode 100644 index 0000000000..cb197a8c95 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.distributed.internal.direct; + + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Random; + +import org.jgroups.util.UUID; +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.distributed.internal.membership.api.Membership; +import org.apache.geode.distributed.internal.membership.api.MessageListener; +import org.apache.geode.internal.net.SocketCreatorFactory; +import org.apache.geode.internal.security.SecurableCommunicationChannel; + +public class DirectChannelTest { + + + private DirectChannel directChannel; + private InternalDistributedMember[] mockMembers; + + Membership<InternalDistributedMember> mgr; + MessageListener<InternalDistributedMember> listener; + ClusterDistributionManager dm; + + DistributionConfig dc; + + /** + * Some tests require a DirectChannel mock + */ + @Before + public void setUp() throws Exception { + listener = mock(MessageListener.class); + mgr = mock(Membership.class); + dm = mock(ClusterDistributionManager.class); + dc = mock(DistributionConfig.class); + + Random r = new Random(); + mockMembers = new InternalDistributedMember[5]; + for (int i = 0; i < mockMembers.length; i++) { + mockMembers[i] = new InternalDistributedMember("localhost", 8888 + i); + UUID uuid = new UUID(r.nextLong(), r.nextLong()); + mockMembers[i].setUUID(uuid); + } + when(dm.getConfig()).thenReturn(dc); + when(dm.getSystem()).thenReturn(mock(InternalDistributedSystem.class)); + + int[] range = new int[2]; + range[0] = 41000; + range[1] = 61000; + when(dc.getMembershipPortRange()).thenReturn(range); + SecurableCommunicationChannel[] sslEnabledComponent = new SecurableCommunicationChannel[1]; + sslEnabledComponent[0] = SecurableCommunicationChannel.CLUSTER; + + when(dc.getSecurableCommunicationChannels()).thenReturn(sslEnabledComponent); + + SocketCreatorFactory.setDistributionConfig(dc); + directChannel = new DirectChannel(mgr, listener, dm); + } + + @Test + public void testScheduleCloseEndpoint() throws Exception { + directChannel.scheduleCloseEndpoint(mockMembers[0], null, false); + directChannel.scheduleCloseEndpoint(mockMembers[1], null, false); + directChannel.scheduleCloseEndpoint(mockMembers[2], null, false); + + assertThat(directChannel.getCloseEndpointExecutorQueueSize()).isEqualTo(3); + } + + @Test + public void testScheduleCloseEndpointAndClearAllAtDisconnect() throws Exception { + directChannel.scheduleCloseEndpoint(mockMembers[0], null, false); + directChannel.scheduleCloseEndpoint(mockMembers[1], null, false); + directChannel.scheduleCloseEndpoint(mockMembers[2], null, false); + directChannel.disconnect(null); + + assertThat(directChannel.getCloseEndpointExecutorQueueSize()).isEqualTo(0); + } +}