Bill commented on a change in pull request #7409:
URL: https://github.com/apache/geode/pull/7409#discussion_r837942017



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
##########
@@ -2960,24 +2964,15 @@ private boolean readHandshakeForReceiver(final 
DataInput dis) {
       final boolean isSecure = authInit != null && !authInit.isEmpty();
 
       if (isSecure) {
-        if (owner.getConduit().waitForMembershipCheck(remoteMember)) {
-          sendOKHandshakeReply();
-          notifyHandshakeWaiter(true);
-        } else {
-          // check if we need notifyHandshakeWaiter() call.
+        if (!owner.getConduit().waitForMembershipCheck(remoteMember)) {
           notifyHandshakeWaiter(false);
-          logger.warn("{} timed out during a membership check.",
-              p2pReaderName());
+          logger.warn("{} timed out during a membership check.", 
p2pReaderName());
+          requestClose("timed out during a membership check");

Review comment:
       This line is the key to the fix (per @kamilla1201's analysis in the 
ticket)

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static 
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+  /**
+   * Create a sender connection and a receiver connection and pass data from
+   * one to the other.
+   *
+   * This test uses a real socket, but attempts to mock all other collaborators
+   * of connection, such as the InternalDistributedSystem.
+   */
+  @Test
+  public void 
testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+      throws Exception {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+
+    final CompletableFuture<Connection> readerFuture = 
createReaderFuture(acceptorSocket, true);
+
+    // Create a sender that connects to the server socket, which should 
trigger the
+    // reader to be created
+    final Connection sender = createWriter(serverSocketPort, false);
+    // Get the reader from the future
+    final Connection reader = readerFuture.get();
+
+    final ReplyMessage msg = createRelyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, 
false, stats, bufferPool);
+    streamer.writeMessage();
+
+    await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(), 
any()));
+
+    assertThat(reader.isClosing()).isFalse();
+    verify(reader, times(1)).readHandshakeForReceiver(any());
+    verify(reader, times(0)).requestClose(any());
+  }
+
+  @Test
+  public void testReceiverClosesConnectionIfMembershipCheckFailed() throws 
Exception {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+    final CompletableFuture<Connection> readerFuture = 
createReaderFuture(acceptorSocket, false);
+
+    final Connection sender = createWriter(serverSocketPort, true);
+
+    final Connection reader = readerFuture.get();
+    final ReplyMessage msg = createRelyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, 
false, stats, bufferPool);
+    streamer.writeMessage();
+
+    await().untilAsserted(() -> 
assertThat(assertThat(reader.isClosing()).isTrue()));
+
+    verify(reader, times(1)).readHandshakeForReceiver(any());
+    verify(reader, times(1)).requestClose("timed out during a membership 
check");
+  }
+
+  /**
+   * Start an asynchronous runnable that is waiting for a sender to connect to 
the socket
+   * When the sender connects, this runnable will create a receiver connection 
and
+   * return it to the future.
+   */
+  private CompletableFuture<Connection> createReaderFuture(ServerSocketChannel 
acceptorSocket,
+      boolean isSenderInView) {
+    return CompletableFuture.supplyAsync(
+        () -> createReceiverConnectionOnFirstAccept(acceptorSocket, 
isSenderInView));
+  }
+
+  /**
+   * Creates a socket for the receiver side. This is the server socket 
listening for connections.
+   */
+  private ServerSocketChannel createReceiverSocket() throws IOException {
+    final ServerSocketChannel acceptorSocket = ServerSocketChannel.open();
+    acceptorSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+    return acceptorSocket;
+  }
+
+  /**
+   * Creates a dummy reply message.
+   */
+  private ReplyMessage createRelyMessage(Connection sender) {

Review comment:
       typo: "Rely" => "Reply"

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static 
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+  /**
+   * Create a sender connection and a receiver connection and pass data from
+   * one to the other.
+   *
+   * This test uses a real socket, but attempts to mock all other collaborators
+   * of connection, such as the InternalDistributedSystem.
+   */
+  @Test
+  public void 
testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+      throws Exception {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+
+    final CompletableFuture<Connection> readerFuture = 
createReaderFuture(acceptorSocket, true);
+
+    // Create a sender that connects to the server socket, which should 
trigger the
+    // reader to be created
+    final Connection sender = createWriter(serverSocketPort, false);
+    // Get the reader from the future
+    final Connection reader = readerFuture.get();
+
+    final ReplyMessage msg = createRelyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, 
false, stats, bufferPool);
+    streamer.writeMessage();
+
+    await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(), 
any()));
+
+    assertThat(reader.isClosing()).isFalse();
+    verify(reader, times(1)).readHandshakeForReceiver(any());
+    verify(reader, times(0)).requestClose(any());
+  }
+
+  @Test
+  public void testReceiverClosesConnectionIfMembershipCheckFailed() throws 
Exception {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+    final CompletableFuture<Connection> readerFuture = 
createReaderFuture(acceptorSocket, false);
+
+    final Connection sender = createWriter(serverSocketPort, true);
+
+    final Connection reader = readerFuture.get();
+    final ReplyMessage msg = createRelyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, 
false, stats, bufferPool);
+    streamer.writeMessage();
+
+    await().untilAsserted(() -> 
assertThat(assertThat(reader.isClosing()).isTrue()));
+
+    verify(reader, times(1)).readHandshakeForReceiver(any());
+    verify(reader, times(1)).requestClose("timed out during a membership 
check");
+  }
+
+  /**
+   * Start an asynchronous runnable that is waiting for a sender to connect to 
the socket
+   * When the sender connects, this runnable will create a receiver connection 
and
+   * return it to the future.
+   */
+  private CompletableFuture<Connection> createReaderFuture(ServerSocketChannel 
acceptorSocket,
+      boolean isSenderInView) {
+    return CompletableFuture.supplyAsync(
+        () -> createReceiverConnectionOnFirstAccept(acceptorSocket, 
isSenderInView));
+  }
+
+  /**
+   * Creates a socket for the receiver side. This is the server socket 
listening for connections.
+   */
+  private ServerSocketChannel createReceiverSocket() throws IOException {
+    final ServerSocketChannel acceptorSocket = ServerSocketChannel.open();
+    acceptorSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+    return acceptorSocket;
+  }
+
+  /**
+   * Creates a dummy reply message.
+   */
+  private ReplyMessage createRelyMessage(Connection sender) {
+    final ReplyMessage msg = new ReplyMessage();
+    msg.setProcessorId(1);
+    msg.setRecipient(sender.getRemoteAddress());
+    return msg;
+  }
+
+  /**
+   * Create a sender that connects to the server socket.
+   */
+  private Connection createWriter(final int serverSocketPort, boolean 
isCancelInProgress)
+      throws IOException {
+    final ConnectionTable writerTable = mockConnectionTable();
+
+    final Membership<InternalDistributedMember> membership = 
mock(Membership.class);
+    final TCPConduit conduit = writerTable.getConduit();
+
+    
when(conduit.getCancelCriterion().isCancelInProgress()).thenReturn(isCancelInProgress);
+    when(conduit.getMembership()).thenReturn(membership);
+    when(membership.memberExists(any())).thenReturn(true);
+    final InternalDistributedMember remoteAddr =
+        new InternalDistributedMember(InetAddress.getLocalHost(), 0, true, 
true);
+    remoteAddr.setDirectChannelPort(serverSocketPort);
+    final InternalDistributedMember senderAddr =
+        new InternalDistributedMember(InetAddress.getLocalHost(), 1, true, 
true);
+    when(conduit.getDM().getCanonicalId(remoteAddr)).thenReturn(remoteAddr);
+    when(conduit.getDM().getCanonicalId(senderAddr)).thenReturn(senderAddr);
+    senderAddr.setDirectChannelPort(conduit.getPort());
+    when(conduit.getMemberId()).thenReturn(senderAddr);
+
+    return spy(Connection.createSender(membership, writerTable, true, 
remoteAddr, true,
+        System.currentTimeMillis(), 1000, 1000));
+  }
+
+  private Connection createReceiverConnectionOnFirstAccept(final 
ServerSocketChannel acceptorSocket,
+      boolean isSenderInView) {
+    try {
+      final SocketChannel readerSocket = acceptorSocket.accept();
+      final ConnectionTable readerTable = mockConnectionTable();
+      if (isSenderInView) {
+        
when(readerTable.getConduit().waitForMembershipCheck(any())).thenReturn(true);
+      }
+
+      final Connection reader = spy(new Connection(readerTable, 
readerSocket.socket()));
+      CompletableFuture.runAsync(() -> {
+        try {
+          reader.initReceiver();
+        } catch (final RuntimeException e) {
+          e.printStackTrace();
+          throw e;
+        }
+      });
+      return reader;
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ConnectionTable mockConnectionTable() throws UnknownHostException {
+    final ConnectionTable connectionTable = mock(ConnectionTable.class);
+    final Distribution distribution = mock(Distribution.class);
+    final DistributionManager distributionManager = 
mock(DistributionManager.class);
+    final DMStats dmStats = mock(DMStats.class);
+    final CancelCriterion stopper = mock(CancelCriterion.class);
+    final SocketCloser socketCloser = mock(SocketCloser.class);
+    final TCPConduit tcpConduit = mock(TCPConduit.class);
+    final ThreadsMonitoring threadMonitoring = mock(ThreadsMonitoring.class);
+    final AbstractExecutor threadMonitoringExecutor = 
mock(AbstractExecutor.class);
+    final DistributionConfig config = mock(DistributionConfig.class);
+
+    System.setProperty(SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT, 
"true");
+    when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+    when(connectionTable.getConduit()).thenReturn(tcpConduit);
+    when(connectionTable.getDM()).thenReturn(distributionManager);
+    when(distributionManager.getConfig()).thenReturn(config);
+    when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+    when(distributionManager.getDistribution()).thenReturn(distribution);
+    when(stopper.cancelInProgress()).thenReturn(null);
+    when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+    when(tcpConduit.getDM()).thenReturn(distributionManager);
+    when(tcpConduit.getSocketId()).thenReturn(new 
InetSocketAddress(getLocalHost(), 10337));
+    when(tcpConduit.getStats()).thenReturn(dmStats);
+    
when(distributionManager.getThreadMonitoring()).thenReturn(threadMonitoring);
+    
when(threadMonitoring.createAbstractExecutor(any())).thenReturn(threadMonitoringExecutor);
+    when(tcpConduit.getConfig()).thenReturn(config);

Review comment:
       Would it be possible to group these `when`s by their subject e.g. all 
the `connectionTable` ones in a group, and all the `distributionManager` ones 
in a group?
   
   ditto the `when`s in the other methods

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static 
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+  /**
+   * Create a sender connection and a receiver connection and pass data from
+   * one to the other.
+   *
+   * This test uses a real socket, but attempts to mock all other collaborators
+   * of connection, such as the InternalDistributedSystem.
+   */
+  @Test
+  public void 
testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+      throws Exception {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+
+    final CompletableFuture<Connection> readerFuture = 
createReaderFuture(acceptorSocket, true);
+
+    // Create a sender that connects to the server socket, which should 
trigger the
+    // reader to be created
+    final Connection sender = createWriter(serverSocketPort, false);
+    // Get the reader from the future
+    final Connection reader = readerFuture.get();
+
+    final ReplyMessage msg = createRelyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, 
false, stats, bufferPool);
+    streamer.writeMessage();
+
+    await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(), 
any()));
+
+    assertThat(reader.isClosing()).isFalse();
+    verify(reader, times(1)).readHandshakeForReceiver(any());
+    verify(reader, times(0)).requestClose(any());
+  }
+
+  @Test
+  public void testReceiverClosesConnectionIfMembershipCheckFailed() throws 
Exception {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+    final CompletableFuture<Connection> readerFuture = 
createReaderFuture(acceptorSocket, false);
+
+    final Connection sender = createWriter(serverSocketPort, true);
+
+    final Connection reader = readerFuture.get();
+    final ReplyMessage msg = createRelyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, 
false, stats, bufferPool);
+    streamer.writeMessage();
+
+    await().untilAsserted(() -> 
assertThat(assertThat(reader.isClosing()).isTrue()));
+
+    verify(reader, times(1)).readHandshakeForReceiver(any());
+    verify(reader, times(1)).requestClose("timed out during a membership 
check");
+  }
+
+  /**
+   * Start an asynchronous runnable that is waiting for a sender to connect to 
the socket
+   * When the sender connects, this runnable will create a receiver connection 
and
+   * return it to the future.
+   */
+  private CompletableFuture<Connection> createReaderFuture(ServerSocketChannel 
acceptorSocket,
+      boolean isSenderInView) {
+    return CompletableFuture.supplyAsync(
+        () -> createReceiverConnectionOnFirstAccept(acceptorSocket, 
isSenderInView));
+  }
+
+  /**
+   * Creates a socket for the receiver side. This is the server socket 
listening for connections.
+   */
+  private ServerSocketChannel createReceiverSocket() throws IOException {
+    final ServerSocketChannel acceptorSocket = ServerSocketChannel.open();
+    acceptorSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+    return acceptorSocket;
+  }
+
+  /**
+   * Creates a dummy reply message.
+   */
+  private ReplyMessage createRelyMessage(Connection sender) {
+    final ReplyMessage msg = new ReplyMessage();
+    msg.setProcessorId(1);
+    msg.setRecipient(sender.getRemoteAddress());
+    return msg;
+  }
+
+  /**
+   * Create a sender that connects to the server socket.
+   */
+  private Connection createWriter(final int serverSocketPort, boolean 
isCancelInProgress)
+      throws IOException {
+    final ConnectionTable writerTable = mockConnectionTable();
+
+    final Membership<InternalDistributedMember> membership = 
mock(Membership.class);
+    final TCPConduit conduit = writerTable.getConduit();
+
+    
when(conduit.getCancelCriterion().isCancelInProgress()).thenReturn(isCancelInProgress);
+    when(conduit.getMembership()).thenReturn(membership);
+    when(membership.memberExists(any())).thenReturn(true);
+    final InternalDistributedMember remoteAddr =
+        new InternalDistributedMember(InetAddress.getLocalHost(), 0, true, 
true);
+    remoteAddr.setDirectChannelPort(serverSocketPort);
+    final InternalDistributedMember senderAddr =
+        new InternalDistributedMember(InetAddress.getLocalHost(), 1, true, 
true);
+    when(conduit.getDM().getCanonicalId(remoteAddr)).thenReturn(remoteAddr);
+    when(conduit.getDM().getCanonicalId(senderAddr)).thenReturn(senderAddr);
+    senderAddr.setDirectChannelPort(conduit.getPort());
+    when(conduit.getMemberId()).thenReturn(senderAddr);
+
+    return spy(Connection.createSender(membership, writerTable, true, 
remoteAddr, true,
+        System.currentTimeMillis(), 1000, 1000));
+  }
+
+  private Connection createReceiverConnectionOnFirstAccept(final 
ServerSocketChannel acceptorSocket,
+      boolean isSenderInView) {
+    try {
+      final SocketChannel readerSocket = acceptorSocket.accept();
+      final ConnectionTable readerTable = mockConnectionTable();
+      if (isSenderInView) {
+        
when(readerTable.getConduit().waitForMembershipCheck(any())).thenReturn(true);
+      }
+
+      final Connection reader = spy(new Connection(readerTable, 
readerSocket.socket()));
+      CompletableFuture.runAsync(() -> {
+        try {
+          reader.initReceiver();
+        } catch (final RuntimeException e) {
+          e.printStackTrace();
+          throw e;
+        }
+      });
+      return reader;
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ConnectionTable mockConnectionTable() throws UnknownHostException {
+    final ConnectionTable connectionTable = mock(ConnectionTable.class);
+    final Distribution distribution = mock(Distribution.class);
+    final DistributionManager distributionManager = 
mock(DistributionManager.class);
+    final DMStats dmStats = mock(DMStats.class);
+    final CancelCriterion stopper = mock(CancelCriterion.class);
+    final SocketCloser socketCloser = mock(SocketCloser.class);
+    final TCPConduit tcpConduit = mock(TCPConduit.class);
+    final ThreadsMonitoring threadMonitoring = mock(ThreadsMonitoring.class);
+    final AbstractExecutor threadMonitoringExecutor = 
mock(AbstractExecutor.class);
+    final DistributionConfig config = mock(DistributionConfig.class);
+
+    System.setProperty(SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT, 
"true");
+    when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+    when(connectionTable.getConduit()).thenReturn(tcpConduit);
+    when(connectionTable.getDM()).thenReturn(distributionManager);
+    when(distributionManager.getConfig()).thenReturn(config);
+    when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+    when(distributionManager.getDistribution()).thenReturn(distribution);
+    when(stopper.cancelInProgress()).thenReturn(null);
+    when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+    when(tcpConduit.getDM()).thenReturn(distributionManager);
+    when(tcpConduit.getSocketId()).thenReturn(new 
InetSocketAddress(getLocalHost(), 10337));
+    when(tcpConduit.getStats()).thenReturn(dmStats);
+    
when(distributionManager.getThreadMonitoring()).thenReturn(threadMonitoring);
+    
when(threadMonitoring.createAbstractExecutor(any())).thenReturn(threadMonitoringExecutor);
+    when(tcpConduit.getConfig()).thenReturn(config);
+    tcpConduit.tcpBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
+
+    doAnswer(invocationOnMock -> {
+      final Runnable runnable = (invocationOnMock.getArgument(0));
+      CompletableFuture.runAsync(() -> {
+        try {
+          runnable.run();
+
+        } catch (final Exception e) {
+          e.printStackTrace();
+          throw e;
+        }
+      });
+      try {
+        Thread.sleep(1000);

Review comment:
       What is the purpose of this `sleep()`? My concern is that it can 
contribute to nondeterminism in the test.
   
   Could the sleep be replaced with something deterministic like a 
`CountDownLatch`?

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static 
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+  /**
+   * Create a sender connection and a receiver connection and pass data from
+   * one to the other.
+   *
+   * This test uses a real socket, but attempts to mock all other collaborators

Review comment:
       suggest replacing "attempts to mock" with "mocks"

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static 
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static 
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+  /**
+   * Create a sender connection and a receiver connection and pass data from
+   * one to the other.
+   *
+   * This test uses a real socket, but attempts to mock all other collaborators
+   * of connection, such as the InternalDistributedSystem.
+   */
+  @Test
+  public void 
testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+      throws Exception {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+
+    final CompletableFuture<Connection> readerFuture = 
createReaderFuture(acceptorSocket, true);
+
+    // Create a sender that connects to the server socket, which should 
trigger the
+    // reader to be created
+    final Connection sender = createWriter(serverSocketPort, false);
+    // Get the reader from the future
+    final Connection reader = readerFuture.get();
+
+    final ReplyMessage msg = createRelyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, 
false, stats, bufferPool);
+    streamer.writeMessage();
+
+    await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(), 
any()));
+
+    assertThat(reader.isClosing()).isFalse();
+    verify(reader, times(1)).readHandshakeForReceiver(any());
+    verify(reader, times(0)).requestClose(any());
+  }
+
+  @Test
+  public void testReceiverClosesConnectionIfMembershipCheckFailed() throws 
Exception {
+    final DMStats stats = mock(DMStats.class);
+    final BufferPool bufferPool = new BufferPool(stats);
+    final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+    final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+    final CompletableFuture<Connection> readerFuture = 
createReaderFuture(acceptorSocket, false);
+
+    final Connection sender = createWriter(serverSocketPort, true);
+
+    final Connection reader = readerFuture.get();
+    final ReplyMessage msg = createRelyMessage(sender);
+
+    final List<Connection> connections = new ArrayList<>();
+    connections.add(sender);
+
+    final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg, 
false, stats, bufferPool);
+    streamer.writeMessage();

Review comment:
       All the setup in this method from about line 102 to about line 118 is 
identical to logic in the previous test (with the exception of the 
`isSenderInView` parameter to `createReaderFuture()`.
   
   Would it make sense to turn that duplicated code into a method that takes a 
`isSenderInView` parameter?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
##########
@@ -2960,24 +2964,15 @@ private boolean readHandshakeForReceiver(final 
DataInput dis) {
       final boolean isSecure = authInit != null && !authInit.isEmpty();
 
       if (isSecure) {
-        if (owner.getConduit().waitForMembershipCheck(remoteMember)) {
-          sendOKHandshakeReply();
-          notifyHandshakeWaiter(true);
-        } else {
-          // check if we need notifyHandshakeWaiter() call.
+        if (!owner.getConduit().waitForMembershipCheck(remoteMember)) {
           notifyHandshakeWaiter(false);
-          logger.warn("{} timed out during a membership check.",
-              p2pReaderName());
+          logger.warn("{} timed out during a membership check.", 
p2pReaderName());
+          requestClose("timed out during a membership check");
           return true;
         }
-      } else {
-        sendOKHandshakeReply();
-        try {
-          notifyHandshakeWaiter(true);
-        } catch (Exception e) {
-          logger.fatal("Uncaught exception from listener", e);
-        }
       }
+      sendOKHandshakeReply();
+      notifyHandshakeWaiter(true);

Review comment:
       The logic simplifications in this method make the flow easier to 
understand.
   
   The only functional change I see (besides the new `requestClose()`, is that 
if `notifyHandshakeWaiter()` throws an `Exception` we won't log a fatal-level 
message. That catch block was introduced by:
   
   GEODE-2113 Implement SSL over NIO 
   
   I perused `notifyHandshakeWaiter()` and the methods it calls and I don't see 
any possibility of unhandled exceptions. As a result I think it's ok to remove 
that catch block.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to