Bill commented on a change in pull request #7409:
URL: https://github.com/apache/geode/pull/7409#discussion_r837942017
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
##########
@@ -2960,24 +2964,15 @@ private boolean readHandshakeForReceiver(final
DataInput dis) {
final boolean isSecure = authInit != null && !authInit.isEmpty();
if (isSecure) {
- if (owner.getConduit().waitForMembershipCheck(remoteMember)) {
- sendOKHandshakeReply();
- notifyHandshakeWaiter(true);
- } else {
- // check if we need notifyHandshakeWaiter() call.
+ if (!owner.getConduit().waitForMembershipCheck(remoteMember)) {
notifyHandshakeWaiter(false);
- logger.warn("{} timed out during a membership check.",
- p2pReaderName());
+ logger.warn("{} timed out during a membership check.",
p2pReaderName());
+ requestClose("timed out during a membership check");
Review comment:
This line is the key to the fix (per @kamilla1201's analysis in the
ticket)
##########
File path:
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+ /**
+ * Create a sender connection and a receiver connection and pass data from
+ * one to the other.
+ *
+ * This test uses a real socket, but attempts to mock all other collaborators
+ * of connection, such as the InternalDistributedSystem.
+ */
+ @Test
+ public void
testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+ throws Exception {
+ final DMStats stats = mock(DMStats.class);
+ final BufferPool bufferPool = new BufferPool(stats);
+
+ final ServerSocketChannel acceptorSocket = createReceiverSocket();
+ final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+
+ final CompletableFuture<Connection> readerFuture =
createReaderFuture(acceptorSocket, true);
+
+ // Create a sender that connects to the server socket, which should
trigger the
+ // reader to be created
+ final Connection sender = createWriter(serverSocketPort, false);
+ // Get the reader from the future
+ final Connection reader = readerFuture.get();
+
+ final ReplyMessage msg = createRelyMessage(sender);
+
+ final List<Connection> connections = new ArrayList<>();
+ connections.add(sender);
+
+ final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg,
false, stats, bufferPool);
+ streamer.writeMessage();
+
+ await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(),
any()));
+
+ assertThat(reader.isClosing()).isFalse();
+ verify(reader, times(1)).readHandshakeForReceiver(any());
+ verify(reader, times(0)).requestClose(any());
+ }
+
+ @Test
+ public void testReceiverClosesConnectionIfMembershipCheckFailed() throws
Exception {
+ final DMStats stats = mock(DMStats.class);
+ final BufferPool bufferPool = new BufferPool(stats);
+ final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+ final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+ final CompletableFuture<Connection> readerFuture =
createReaderFuture(acceptorSocket, false);
+
+ final Connection sender = createWriter(serverSocketPort, true);
+
+ final Connection reader = readerFuture.get();
+ final ReplyMessage msg = createRelyMessage(sender);
+
+ final List<Connection> connections = new ArrayList<>();
+ connections.add(sender);
+
+ final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg,
false, stats, bufferPool);
+ streamer.writeMessage();
+
+ await().untilAsserted(() ->
assertThat(assertThat(reader.isClosing()).isTrue()));
+
+ verify(reader, times(1)).readHandshakeForReceiver(any());
+ verify(reader, times(1)).requestClose("timed out during a membership
check");
+ }
+
+ /**
+ * Start an asynchronous runnable that is waiting for a sender to connect to
the socket
+ * When the sender connects, this runnable will create a receiver connection
and
+ * return it to the future.
+ */
+ private CompletableFuture<Connection> createReaderFuture(ServerSocketChannel
acceptorSocket,
+ boolean isSenderInView) {
+ return CompletableFuture.supplyAsync(
+ () -> createReceiverConnectionOnFirstAccept(acceptorSocket,
isSenderInView));
+ }
+
+ /**
+ * Creates a socket for the receiver side. This is the server socket
listening for connections.
+ */
+ private ServerSocketChannel createReceiverSocket() throws IOException {
+ final ServerSocketChannel acceptorSocket = ServerSocketChannel.open();
+ acceptorSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+ return acceptorSocket;
+ }
+
+ /**
+ * Creates a dummy reply message.
+ */
+ private ReplyMessage createRelyMessage(Connection sender) {
Review comment:
typo: "Rely" => "Reply"
##########
File path:
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+ /**
+ * Create a sender connection and a receiver connection and pass data from
+ * one to the other.
+ *
+ * This test uses a real socket, but attempts to mock all other collaborators
+ * of connection, such as the InternalDistributedSystem.
+ */
+ @Test
+ public void
testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+ throws Exception {
+ final DMStats stats = mock(DMStats.class);
+ final BufferPool bufferPool = new BufferPool(stats);
+
+ final ServerSocketChannel acceptorSocket = createReceiverSocket();
+ final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+
+ final CompletableFuture<Connection> readerFuture =
createReaderFuture(acceptorSocket, true);
+
+ // Create a sender that connects to the server socket, which should
trigger the
+ // reader to be created
+ final Connection sender = createWriter(serverSocketPort, false);
+ // Get the reader from the future
+ final Connection reader = readerFuture.get();
+
+ final ReplyMessage msg = createRelyMessage(sender);
+
+ final List<Connection> connections = new ArrayList<>();
+ connections.add(sender);
+
+ final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg,
false, stats, bufferPool);
+ streamer.writeMessage();
+
+ await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(),
any()));
+
+ assertThat(reader.isClosing()).isFalse();
+ verify(reader, times(1)).readHandshakeForReceiver(any());
+ verify(reader, times(0)).requestClose(any());
+ }
+
+ @Test
+ public void testReceiverClosesConnectionIfMembershipCheckFailed() throws
Exception {
+ final DMStats stats = mock(DMStats.class);
+ final BufferPool bufferPool = new BufferPool(stats);
+ final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+ final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+ final CompletableFuture<Connection> readerFuture =
createReaderFuture(acceptorSocket, false);
+
+ final Connection sender = createWriter(serverSocketPort, true);
+
+ final Connection reader = readerFuture.get();
+ final ReplyMessage msg = createRelyMessage(sender);
+
+ final List<Connection> connections = new ArrayList<>();
+ connections.add(sender);
+
+ final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg,
false, stats, bufferPool);
+ streamer.writeMessage();
+
+ await().untilAsserted(() ->
assertThat(assertThat(reader.isClosing()).isTrue()));
+
+ verify(reader, times(1)).readHandshakeForReceiver(any());
+ verify(reader, times(1)).requestClose("timed out during a membership
check");
+ }
+
+ /**
+ * Start an asynchronous runnable that is waiting for a sender to connect to
the socket
+ * When the sender connects, this runnable will create a receiver connection
and
+ * return it to the future.
+ */
+ private CompletableFuture<Connection> createReaderFuture(ServerSocketChannel
acceptorSocket,
+ boolean isSenderInView) {
+ return CompletableFuture.supplyAsync(
+ () -> createReceiverConnectionOnFirstAccept(acceptorSocket,
isSenderInView));
+ }
+
+ /**
+ * Creates a socket for the receiver side. This is the server socket
listening for connections.
+ */
+ private ServerSocketChannel createReceiverSocket() throws IOException {
+ final ServerSocketChannel acceptorSocket = ServerSocketChannel.open();
+ acceptorSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+ return acceptorSocket;
+ }
+
+ /**
+ * Creates a dummy reply message.
+ */
+ private ReplyMessage createRelyMessage(Connection sender) {
+ final ReplyMessage msg = new ReplyMessage();
+ msg.setProcessorId(1);
+ msg.setRecipient(sender.getRemoteAddress());
+ return msg;
+ }
+
+ /**
+ * Create a sender that connects to the server socket.
+ */
+ private Connection createWriter(final int serverSocketPort, boolean
isCancelInProgress)
+ throws IOException {
+ final ConnectionTable writerTable = mockConnectionTable();
+
+ final Membership<InternalDistributedMember> membership =
mock(Membership.class);
+ final TCPConduit conduit = writerTable.getConduit();
+
+
when(conduit.getCancelCriterion().isCancelInProgress()).thenReturn(isCancelInProgress);
+ when(conduit.getMembership()).thenReturn(membership);
+ when(membership.memberExists(any())).thenReturn(true);
+ final InternalDistributedMember remoteAddr =
+ new InternalDistributedMember(InetAddress.getLocalHost(), 0, true,
true);
+ remoteAddr.setDirectChannelPort(serverSocketPort);
+ final InternalDistributedMember senderAddr =
+ new InternalDistributedMember(InetAddress.getLocalHost(), 1, true,
true);
+ when(conduit.getDM().getCanonicalId(remoteAddr)).thenReturn(remoteAddr);
+ when(conduit.getDM().getCanonicalId(senderAddr)).thenReturn(senderAddr);
+ senderAddr.setDirectChannelPort(conduit.getPort());
+ when(conduit.getMemberId()).thenReturn(senderAddr);
+
+ return spy(Connection.createSender(membership, writerTable, true,
remoteAddr, true,
+ System.currentTimeMillis(), 1000, 1000));
+ }
+
+ private Connection createReceiverConnectionOnFirstAccept(final
ServerSocketChannel acceptorSocket,
+ boolean isSenderInView) {
+ try {
+ final SocketChannel readerSocket = acceptorSocket.accept();
+ final ConnectionTable readerTable = mockConnectionTable();
+ if (isSenderInView) {
+
when(readerTable.getConduit().waitForMembershipCheck(any())).thenReturn(true);
+ }
+
+ final Connection reader = spy(new Connection(readerTable,
readerSocket.socket()));
+ CompletableFuture.runAsync(() -> {
+ try {
+ reader.initReceiver();
+ } catch (final RuntimeException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ });
+ return reader;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ConnectionTable mockConnectionTable() throws UnknownHostException {
+ final ConnectionTable connectionTable = mock(ConnectionTable.class);
+ final Distribution distribution = mock(Distribution.class);
+ final DistributionManager distributionManager =
mock(DistributionManager.class);
+ final DMStats dmStats = mock(DMStats.class);
+ final CancelCriterion stopper = mock(CancelCriterion.class);
+ final SocketCloser socketCloser = mock(SocketCloser.class);
+ final TCPConduit tcpConduit = mock(TCPConduit.class);
+ final ThreadsMonitoring threadMonitoring = mock(ThreadsMonitoring.class);
+ final AbstractExecutor threadMonitoringExecutor =
mock(AbstractExecutor.class);
+ final DistributionConfig config = mock(DistributionConfig.class);
+
+ System.setProperty(SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT,
"true");
+ when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+ when(connectionTable.getConduit()).thenReturn(tcpConduit);
+ when(connectionTable.getDM()).thenReturn(distributionManager);
+ when(distributionManager.getConfig()).thenReturn(config);
+ when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+ when(distributionManager.getDistribution()).thenReturn(distribution);
+ when(stopper.cancelInProgress()).thenReturn(null);
+ when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+ when(tcpConduit.getDM()).thenReturn(distributionManager);
+ when(tcpConduit.getSocketId()).thenReturn(new
InetSocketAddress(getLocalHost(), 10337));
+ when(tcpConduit.getStats()).thenReturn(dmStats);
+
when(distributionManager.getThreadMonitoring()).thenReturn(threadMonitoring);
+
when(threadMonitoring.createAbstractExecutor(any())).thenReturn(threadMonitoringExecutor);
+ when(tcpConduit.getConfig()).thenReturn(config);
Review comment:
Would it be possible to group these `when`s by their subject e.g. all
the `connectionTable` ones in a group, and all the `distributionManager` ones
in a group?
ditto the `when`s in the other methods
##########
File path:
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+ /**
+ * Create a sender connection and a receiver connection and pass data from
+ * one to the other.
+ *
+ * This test uses a real socket, but attempts to mock all other collaborators
+ * of connection, such as the InternalDistributedSystem.
+ */
+ @Test
+ public void
testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+ throws Exception {
+ final DMStats stats = mock(DMStats.class);
+ final BufferPool bufferPool = new BufferPool(stats);
+
+ final ServerSocketChannel acceptorSocket = createReceiverSocket();
+ final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+
+ final CompletableFuture<Connection> readerFuture =
createReaderFuture(acceptorSocket, true);
+
+ // Create a sender that connects to the server socket, which should
trigger the
+ // reader to be created
+ final Connection sender = createWriter(serverSocketPort, false);
+ // Get the reader from the future
+ final Connection reader = readerFuture.get();
+
+ final ReplyMessage msg = createRelyMessage(sender);
+
+ final List<Connection> connections = new ArrayList<>();
+ connections.add(sender);
+
+ final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg,
false, stats, bufferPool);
+ streamer.writeMessage();
+
+ await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(),
any()));
+
+ assertThat(reader.isClosing()).isFalse();
+ verify(reader, times(1)).readHandshakeForReceiver(any());
+ verify(reader, times(0)).requestClose(any());
+ }
+
+ @Test
+ public void testReceiverClosesConnectionIfMembershipCheckFailed() throws
Exception {
+ final DMStats stats = mock(DMStats.class);
+ final BufferPool bufferPool = new BufferPool(stats);
+ final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+ final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+ final CompletableFuture<Connection> readerFuture =
createReaderFuture(acceptorSocket, false);
+
+ final Connection sender = createWriter(serverSocketPort, true);
+
+ final Connection reader = readerFuture.get();
+ final ReplyMessage msg = createRelyMessage(sender);
+
+ final List<Connection> connections = new ArrayList<>();
+ connections.add(sender);
+
+ final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg,
false, stats, bufferPool);
+ streamer.writeMessage();
+
+ await().untilAsserted(() ->
assertThat(assertThat(reader.isClosing()).isTrue()));
+
+ verify(reader, times(1)).readHandshakeForReceiver(any());
+ verify(reader, times(1)).requestClose("timed out during a membership
check");
+ }
+
+ /**
+ * Start an asynchronous runnable that is waiting for a sender to connect to
the socket
+ * When the sender connects, this runnable will create a receiver connection
and
+ * return it to the future.
+ */
+ private CompletableFuture<Connection> createReaderFuture(ServerSocketChannel
acceptorSocket,
+ boolean isSenderInView) {
+ return CompletableFuture.supplyAsync(
+ () -> createReceiverConnectionOnFirstAccept(acceptorSocket,
isSenderInView));
+ }
+
+ /**
+ * Creates a socket for the receiver side. This is the server socket
listening for connections.
+ */
+ private ServerSocketChannel createReceiverSocket() throws IOException {
+ final ServerSocketChannel acceptorSocket = ServerSocketChannel.open();
+ acceptorSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), 0));
+ return acceptorSocket;
+ }
+
+ /**
+ * Creates a dummy reply message.
+ */
+ private ReplyMessage createRelyMessage(Connection sender) {
+ final ReplyMessage msg = new ReplyMessage();
+ msg.setProcessorId(1);
+ msg.setRecipient(sender.getRemoteAddress());
+ return msg;
+ }
+
+ /**
+ * Create a sender that connects to the server socket.
+ */
+ private Connection createWriter(final int serverSocketPort, boolean
isCancelInProgress)
+ throws IOException {
+ final ConnectionTable writerTable = mockConnectionTable();
+
+ final Membership<InternalDistributedMember> membership =
mock(Membership.class);
+ final TCPConduit conduit = writerTable.getConduit();
+
+
when(conduit.getCancelCriterion().isCancelInProgress()).thenReturn(isCancelInProgress);
+ when(conduit.getMembership()).thenReturn(membership);
+ when(membership.memberExists(any())).thenReturn(true);
+ final InternalDistributedMember remoteAddr =
+ new InternalDistributedMember(InetAddress.getLocalHost(), 0, true,
true);
+ remoteAddr.setDirectChannelPort(serverSocketPort);
+ final InternalDistributedMember senderAddr =
+ new InternalDistributedMember(InetAddress.getLocalHost(), 1, true,
true);
+ when(conduit.getDM().getCanonicalId(remoteAddr)).thenReturn(remoteAddr);
+ when(conduit.getDM().getCanonicalId(senderAddr)).thenReturn(senderAddr);
+ senderAddr.setDirectChannelPort(conduit.getPort());
+ when(conduit.getMemberId()).thenReturn(senderAddr);
+
+ return spy(Connection.createSender(membership, writerTable, true,
remoteAddr, true,
+ System.currentTimeMillis(), 1000, 1000));
+ }
+
+ private Connection createReceiverConnectionOnFirstAccept(final
ServerSocketChannel acceptorSocket,
+ boolean isSenderInView) {
+ try {
+ final SocketChannel readerSocket = acceptorSocket.accept();
+ final ConnectionTable readerTable = mockConnectionTable();
+ if (isSenderInView) {
+
when(readerTable.getConduit().waitForMembershipCheck(any())).thenReturn(true);
+ }
+
+ final Connection reader = spy(new Connection(readerTable,
readerSocket.socket()));
+ CompletableFuture.runAsync(() -> {
+ try {
+ reader.initReceiver();
+ } catch (final RuntimeException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ });
+ return reader;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ConnectionTable mockConnectionTable() throws UnknownHostException {
+ final ConnectionTable connectionTable = mock(ConnectionTable.class);
+ final Distribution distribution = mock(Distribution.class);
+ final DistributionManager distributionManager =
mock(DistributionManager.class);
+ final DMStats dmStats = mock(DMStats.class);
+ final CancelCriterion stopper = mock(CancelCriterion.class);
+ final SocketCloser socketCloser = mock(SocketCloser.class);
+ final TCPConduit tcpConduit = mock(TCPConduit.class);
+ final ThreadsMonitoring threadMonitoring = mock(ThreadsMonitoring.class);
+ final AbstractExecutor threadMonitoringExecutor =
mock(AbstractExecutor.class);
+ final DistributionConfig config = mock(DistributionConfig.class);
+
+ System.setProperty(SECURITY_SYSTEM_PREFIX + SECURITY_PEER_AUTH_INIT,
"true");
+ when(connectionTable.getBufferPool()).thenReturn(new BufferPool(dmStats));
+ when(connectionTable.getConduit()).thenReturn(tcpConduit);
+ when(connectionTable.getDM()).thenReturn(distributionManager);
+ when(distributionManager.getConfig()).thenReturn(config);
+ when(connectionTable.getSocketCloser()).thenReturn(socketCloser);
+ when(distributionManager.getDistribution()).thenReturn(distribution);
+ when(stopper.cancelInProgress()).thenReturn(null);
+ when(tcpConduit.getCancelCriterion()).thenReturn(stopper);
+ when(tcpConduit.getDM()).thenReturn(distributionManager);
+ when(tcpConduit.getSocketId()).thenReturn(new
InetSocketAddress(getLocalHost(), 10337));
+ when(tcpConduit.getStats()).thenReturn(dmStats);
+
when(distributionManager.getThreadMonitoring()).thenReturn(threadMonitoring);
+
when(threadMonitoring.createAbstractExecutor(any())).thenReturn(threadMonitoringExecutor);
+ when(tcpConduit.getConfig()).thenReturn(config);
+ tcpConduit.tcpBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
+
+ doAnswer(invocationOnMock -> {
+ final Runnable runnable = (invocationOnMock.getArgument(0));
+ CompletableFuture.runAsync(() -> {
+ try {
+ runnable.run();
+
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ });
+ try {
+ Thread.sleep(1000);
Review comment:
What is the purpose of this `sleep()`? My concern is that it can
contribute to nondeterminism in the test.
Could the sleep be replaced with something deterministic like a
`CountDownLatch`?
##########
File path:
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+ /**
+ * Create a sender connection and a receiver connection and pass data from
+ * one to the other.
+ *
+ * This test uses a real socket, but attempts to mock all other collaborators
Review comment:
suggest replacing "attempts to mock" with "mocks"
##########
File path:
geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.tcp;
+
+import static
org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT;
+import static
org.apache.geode.distributed.internal.DistributionConfig.DEFAULT_SOCKET_BUFFER_SIZE;
+import static
org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX;
+import static org.apache.geode.internal.inet.LocalHostUtil.getLocalHost;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.Distribution;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category(MembershipTest.class)
+public class ConnectionTransmissionTest {
+
+ /**
+ * Create a sender connection and a receiver connection and pass data from
+ * one to the other.
+ *
+ * This test uses a real socket, but attempts to mock all other collaborators
+ * of connection, such as the InternalDistributedSystem.
+ */
+ @Test
+ public void
testDataTransmittedBetweenSenderAndReceiverIfMembershipCheckPassed()
+ throws Exception {
+ final DMStats stats = mock(DMStats.class);
+ final BufferPool bufferPool = new BufferPool(stats);
+
+ final ServerSocketChannel acceptorSocket = createReceiverSocket();
+ final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+
+ final CompletableFuture<Connection> readerFuture =
createReaderFuture(acceptorSocket, true);
+
+ // Create a sender that connects to the server socket, which should
trigger the
+ // reader to be created
+ final Connection sender = createWriter(serverSocketPort, false);
+ // Get the reader from the future
+ final Connection reader = readerFuture.get();
+
+ final ReplyMessage msg = createRelyMessage(sender);
+
+ final List<Connection> connections = new ArrayList<>();
+ connections.add(sender);
+
+ final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg,
false, stats, bufferPool);
+ streamer.writeMessage();
+
+ await().untilAsserted(() -> verify(reader, times(1)).readMessage(any(),
any()));
+
+ assertThat(reader.isClosing()).isFalse();
+ verify(reader, times(1)).readHandshakeForReceiver(any());
+ verify(reader, times(0)).requestClose(any());
+ }
+
+ @Test
+ public void testReceiverClosesConnectionIfMembershipCheckFailed() throws
Exception {
+ final DMStats stats = mock(DMStats.class);
+ final BufferPool bufferPool = new BufferPool(stats);
+ final ServerSocketChannel acceptorSocket = createReceiverSocket();
+
+ final int serverSocketPort = acceptorSocket.socket().getLocalPort();
+ final CompletableFuture<Connection> readerFuture =
createReaderFuture(acceptorSocket, false);
+
+ final Connection sender = createWriter(serverSocketPort, true);
+
+ final Connection reader = readerFuture.get();
+ final ReplyMessage msg = createRelyMessage(sender);
+
+ final List<Connection> connections = new ArrayList<>();
+ connections.add(sender);
+
+ final BaseMsgStreamer streamer = MsgStreamer.create(connections, msg,
false, stats, bufferPool);
+ streamer.writeMessage();
Review comment:
All the setup in this method from about line 102 to about line 118 is
identical to logic in the previous test (with the exception of the
`isSenderInView` parameter to `createReaderFuture()`.
Would it make sense to turn that duplicated code into a method that takes a
`isSenderInView` parameter?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
##########
@@ -2960,24 +2964,15 @@ private boolean readHandshakeForReceiver(final
DataInput dis) {
final boolean isSecure = authInit != null && !authInit.isEmpty();
if (isSecure) {
- if (owner.getConduit().waitForMembershipCheck(remoteMember)) {
- sendOKHandshakeReply();
- notifyHandshakeWaiter(true);
- } else {
- // check if we need notifyHandshakeWaiter() call.
+ if (!owner.getConduit().waitForMembershipCheck(remoteMember)) {
notifyHandshakeWaiter(false);
- logger.warn("{} timed out during a membership check.",
- p2pReaderName());
+ logger.warn("{} timed out during a membership check.",
p2pReaderName());
+ requestClose("timed out during a membership check");
return true;
}
- } else {
- sendOKHandshakeReply();
- try {
- notifyHandshakeWaiter(true);
- } catch (Exception e) {
- logger.fatal("Uncaught exception from listener", e);
- }
}
+ sendOKHandshakeReply();
+ notifyHandshakeWaiter(true);
Review comment:
The logic simplifications in this method make the flow easier to
understand.
The only functional change I see (besides the new `requestClose()`, is that
if `notifyHandshakeWaiter()` throws an `Exception` we won't log a fatal-level
message. That catch block was introduced by:
GEODE-2113 Implement SSL over NIO
I perused `notifyHandshakeWaiter()` and the methods it calls and I don't see
any possibility of unhandled exceptions. As a result I think it's ok to remove
that catch block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]