This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7d7a98b103 GEODE-10331: schedule delayed CloseEndpoint (#7849)
7d7a98b103 is described below

commit 7d7a98b10355cb25985c031bfd2a67c77f1b6e43
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Mon Sep 19 08:17:42 2022 +0200

    GEODE-10331: schedule delayed CloseEndpoint (#7849)
    
    * GEODE-10331: schedule delayed CloseEndpoint
    
    * GEODE-10331: added TCs
---
 .../distributed/internal/DistributionImpl.java     |  20 +----
 .../distributed/internal/direct/DirectChannel.java |  46 ++++++++++
 .../distributed/internal/DistributionTest.java     |  31 ++++---
 .../internal/direct/DirectChannelTest.java         | 100 +++++++++++++++++++++
 4 files changed, 168 insertions(+), 29 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
index fcba8e4c3e..68059bb898 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
@@ -74,7 +74,6 @@ import 
org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.internal.tcp.ConnectExceptions;
 import org.apache.geode.internal.tcp.ConnectionException;
 import org.apache.geode.internal.util.Breadcrumbs;
-import org.apache.geode.logging.internal.executors.LoggingThread;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.security.AuthenticationRequiredException;
 import org.apache.geode.security.GemFireSecurityException;
@@ -648,28 +647,13 @@ public class DistributionImpl implements Distribution {
     }
   }
 
-  private void destroyMember(final InternalDistributedMember member, final 
String reason) {
+  void destroyMember(final InternalDistributedMember member, final String 
reason) {
     final DirectChannel dc = directChannel;
     if (dc != null) {
       // Bug 37944: make sure this is always done in a separate thread,
       // so that shutdown conditions don't wedge the view lock
       // fix for bug 34010
-      new LoggingThread("disconnect thread for " + member, () -> {
-        try {
-          Thread.sleep(Integer.getInteger("p2p.disconnectDelay", 3000));
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-          // Keep going, try to close the endpoint.
-        }
-        if (!dc.isOpen()) {
-          return;
-        }
-        if (logger.isDebugEnabled()) {
-          logger.debug("Membership: closing connections for departed member 
{}", member);
-        }
-        // close connections, but don't do membership notification since it's 
already been done
-        dc.closeEndpoint(member, reason, false);
-      }).start();
+      dc.scheduleCloseEndpoint(member, reason, false);
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index eaac79f2b8..55108741a5 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -24,6 +24,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.logging.log4j.Logger;
 
@@ -56,6 +59,7 @@ import org.apache.geode.internal.tcp.ConnectionException;
 import org.apache.geode.internal.tcp.MsgStreamer;
 import org.apache.geode.internal.tcp.TCPConduit;
 import org.apache.geode.internal.util.Breadcrumbs;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
@@ -86,6 +90,11 @@ public class DirectChannel {
 
   InternalDistributedMember localAddr;
 
+  private ScheduledExecutorService closeEndpointExecutor;
+
+  private final int CLOSE_ENDPOINT_POOL_SIZE =
+      Integer.getInteger("DirectChannel.CLOSE_ENDPOINT_POOL_SIZE", 1);
+
   /**
    * Callback to set the local address, must be done before this channel is 
used.
    *
@@ -147,6 +156,9 @@ public class DirectChannel {
       logger.info("GemFire P2P Listener started on {}",
           conduit.getSocketId());
 
+      closeEndpointExecutor = 
LoggingExecutors.newScheduledThreadPool(CLOSE_ENDPOINT_POOL_SIZE,
+          "DirectChannel.closeEndpoint", false);
+
     } catch (ConnectionException ce) {
       logger.fatal(String.format("Unable to initialize direct channel because: 
%s",
           ce.getMessage()),
@@ -667,6 +679,7 @@ public class DirectChannel {
   public synchronized void disconnect(Exception cause) {
     disconnected = true;
     disconnectCompleted = false;
+    closeEndpointExecutor.shutdownNow();
     conduit.stop(cause);
     disconnectCompleted = true;
   }
@@ -765,4 +778,37 @@ public class DirectChannel {
   public boolean hasReceiversFor(DistributedMember mbr) {
     return conduit.hasReceiversFor(mbr);
   }
+
+  public void scheduleCloseEndpoint(InternalDistributedMember member, String 
reason,
+      boolean notifyDisconnect) {
+    if (disconnected) {
+      return;
+    }
+    closeEndpointExecutor.schedule(new CloseEndpointRunnable(member, reason, 
notifyDisconnect),
+        Integer.getInteger("p2p.disconnectDelay", 3000), 
TimeUnit.MILLISECONDS);
+  }
+
+  int getCloseEndpointExecutorQueueSize() {
+    ScheduledThreadPoolExecutor implementation =
+        (ScheduledThreadPoolExecutor) closeEndpointExecutor;
+    return implementation.getQueue().size();
+  }
+
+  public class CloseEndpointRunnable implements Runnable {
+
+    protected final InternalDistributedMember member;
+    protected final String reason;
+    protected final boolean notifyDisconnect;
+
+    public CloseEndpointRunnable(InternalDistributedMember member, String 
reason, boolean notify) {
+      this.member = member;
+      this.reason = reason;
+      this.notifyDisconnect = notify;
+    }
+
+    @Override
+    public void run() {
+      closeEndpoint(member, reason, notifyDisconnect);
+    }
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java
index 029a5b99b8..f3844ce47b 100644
--- 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java
@@ -15,13 +15,13 @@
 package org.apache.geode.distributed.internal;
 
 import static 
org.apache.geode.distributed.internal.DistributionImpl.EMPTY_MEMBER_ARRAY;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -109,7 +109,7 @@ public class DistributionTest {
     m.setRecipients(recipients);
     Set<InternalDistributedMember> failures = distribution
         .directChannelSend(recipients, m);
-    assertTrue(failures == null);
+    assertThat(failures == null).isTrue();
     verify(dc).send(any(), any(),
         any(), anyLong(), anyLong());
   }
@@ -126,9 +126,9 @@ public class DistributionTest {
     when(dc.send(any(), any(mockMembers.getClass()),
         any(DistributionMessage.class), anyLong(), 
anyLong())).thenThrow(exception);
     failures = distribution.directChannelSend(recipients, m);
-    assertTrue(failures != null);
-    assertEquals(1, failures.size());
-    assertEquals(recipients.get(0), failures.iterator().next());
+    assertThat(failures != null).isTrue();
+    assertThat(failures).hasSize(1);
+    assertThat(failures.iterator().next()).isEqualTo(recipients.get(0));
   }
 
   @Test
@@ -154,10 +154,10 @@ public class DistributionTest {
     HighPriorityAckedMessage m = new HighPriorityAckedMessage();
     when(membership.getAllMembers(EMPTY_MEMBER_ARRAY)).thenReturn(mockMembers);
     m.setRecipient(DistributionMessage.ALL_RECIPIENTS);
-    assertTrue(m.forAll());
+    assertThat(m.forAll()).isTrue();
     Set<InternalDistributedMember> failures = distribution
         .directChannelSend(null, m);
-    assertTrue(failures == null);
+    assertThat(failures == null).isTrue();
     verify(dc).send(any(), isA(mockMembers.getClass()),
         isA(DistributionMessage.class), anyLong(), anyLong());
   }
@@ -188,8 +188,8 @@ public class DistributionTest {
     Set<InternalDistributedMember> failures =
         distribution.send(Collections.singletonList(mockMembers[0]), m);
     verify(membership, never()).send(any(), any());
-    assertEquals(1, failures.size());
-    assertEquals(mockMembers[0], failures.iterator().next());
+    assertThat(failures).hasSize(1);
+    assertThat(failures.iterator().next()).isEqualTo(mockMembers[0]);
   }
 
   @Test
@@ -228,4 +228,13 @@ public class DistributionTest {
         .isInstanceOf(SystemConnectException.class)
         .hasCause(exception);
   }
+
+  @Test
+  public void testMemberDestroyed() throws Exception {
+    distribution.destroyMember(mockMembers[0], null);
+    distribution.destroyMember(mockMembers[1], null);
+
+    verify(dc).scheduleCloseEndpoint(eq(mockMembers[0]), eq(null), eq(false));
+    verify(dc).scheduleCloseEndpoint(eq(mockMembers[1]), eq(null), eq(false));
+  }
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java
new file mode 100644
index 0000000000..cb197a8c95
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.direct;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Random;
+
+import org.jgroups.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.distributed.internal.membership.api.MessageListener;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+
+public class DirectChannelTest {
+
+
+  private DirectChannel directChannel;
+  private InternalDistributedMember[] mockMembers;
+
+  Membership<InternalDistributedMember> mgr;
+  MessageListener<InternalDistributedMember> listener;
+  ClusterDistributionManager dm;
+
+  DistributionConfig dc;
+
+  /**
+   * Some tests require a DirectChannel mock
+   */
+  @Before
+  public void setUp() throws Exception {
+    listener = mock(MessageListener.class);
+    mgr = mock(Membership.class);
+    dm = mock(ClusterDistributionManager.class);
+    dc = mock(DistributionConfig.class);
+
+    Random r = new Random();
+    mockMembers = new InternalDistributedMember[5];
+    for (int i = 0; i < mockMembers.length; i++) {
+      mockMembers[i] = new InternalDistributedMember("localhost", 8888 + i);
+      UUID uuid = new UUID(r.nextLong(), r.nextLong());
+      mockMembers[i].setUUID(uuid);
+    }
+    when(dm.getConfig()).thenReturn(dc);
+    when(dm.getSystem()).thenReturn(mock(InternalDistributedSystem.class));
+
+    int[] range = new int[2];
+    range[0] = 41000;
+    range[1] = 61000;
+    when(dc.getMembershipPortRange()).thenReturn(range);
+    SecurableCommunicationChannel[] sslEnabledComponent = new 
SecurableCommunicationChannel[1];
+    sslEnabledComponent[0] = SecurableCommunicationChannel.CLUSTER;
+
+    
when(dc.getSecurableCommunicationChannels()).thenReturn(sslEnabledComponent);
+
+    SocketCreatorFactory.setDistributionConfig(dc);
+    directChannel = new DirectChannel(mgr, listener, dm);
+  }
+
+  @Test
+  public void testScheduleCloseEndpoint() throws Exception {
+    directChannel.scheduleCloseEndpoint(mockMembers[0], null, false);
+    directChannel.scheduleCloseEndpoint(mockMembers[1], null, false);
+    directChannel.scheduleCloseEndpoint(mockMembers[2], null, false);
+
+    assertThat(directChannel.getCloseEndpointExecutorQueueSize()).isEqualTo(3);
+  }
+
+  @Test
+  public void testScheduleCloseEndpointAndClearAllAtDisconnect() throws 
Exception {
+    directChannel.scheduleCloseEndpoint(mockMembers[0], null, false);
+    directChannel.scheduleCloseEndpoint(mockMembers[1], null, false);
+    directChannel.scheduleCloseEndpoint(mockMembers[2], null, false);
+    directChannel.disconnect(null);
+
+    assertThat(directChannel.getCloseEndpointExecutorQueueSize()).isEqualTo(0);
+  }
+}

Reply via email to