This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 7d55653 IGNITE-12818 SoLinger should be used for all sockets in discovery - Fixes #7557. 7d55653 is described below commit 7d55653abc029c18d45a1fba9ab5a9f778f7c80d Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Thu Apr 2 13:34:08 2020 +0300 IGNITE-12818 SoLinger should be used for all sockets in discovery - Fixes #7557. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../ignite/spi/discovery/tcp/ClientImpl.java | 10 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 3 +- .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java | 18 +- .../discovery/tcp/DiscoveryClientSocketTest.java | 184 +++++++++++++++++++++ .../IgniteSpiDiscoverySelfTestSuite.java | 5 +- 5 files changed, 210 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 2a9168c..d45dcf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -1122,8 +1122,9 @@ class ClientImpl extends TcpDiscoveryImpl { try { InputStream in = sockStream.stream(); - sock.setKeepAlive(true); - sock.setTcpNoDelay(true); + assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" + + " KeepAlive " + sock.getKeepAlive() + + " TcpNoDelay " + sock.getTcpNoDelay(); while (!isInterrupted()) { TcpDiscoveryAbstractMessage msg; @@ -1552,8 +1553,9 @@ class ClientImpl extends TcpDiscoveryImpl { InputStream in = sockStream.stream(); - sock.setKeepAlive(true); - sock.setTcpNoDelay(true); + assert sock.getKeepAlive() && sock.getTcpNoDelay() : "Socket wasn't configured properly:" + + " KeepAlive " + sock.getKeepAlive() + + " TcpNoDelay " + sock.getTcpNoDelay(); List<TcpDiscoveryAbstractMessage> msgs = null; diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index e6f4136..2886478 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -6470,8 +6470,7 @@ class ServerImpl extends TcpDiscoveryImpl { try { // Set socket options. - sock.setKeepAlive(true); - sock.setTcpNoDelay(true); + spi.configureSocketOptions(sock); int timeout = sock.getSoTimeout(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index d13e25f..6853cdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -1572,9 +1572,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery sock.bind(new InetSocketAddress(locHost, 0)); - sock.setTcpNoDelay(true); - - sock.setSoLinger(getSoLinger() >= 0, getSoLinger()); + configureSocketOptions(sock); return sock; } catch (IOException e) { @@ -1631,6 +1629,20 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscovery } /** + * Configures socket options. + * + * @param sock Socket. + * @throws SocketException If failed. + */ + void configureSocketOptions(Socket sock) throws SocketException { + sock.setTcpNoDelay(true); + + sock.setSoLinger(getSoLinger() >= 0, getSoLinger()); + + sock.setKeepAlive(true); + } + + /** * Writes message to the socket. * * @param sock Socket. diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java new file mode 100644 index 0000000..c94ad1b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Arrays; +import javax.net.SocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocketFactory; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.ssl.SslContextFactory; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Check a ssl socket configuration which used in discovery. + */ +public class DiscoveryClientSocketTest { + /** Port to listen. */ + public static final int PORT_TO_LNSR = 12346; + + /** Host for bind. */ + public static final String HOST = "localhost"; + + /** SSL server socket factory. */ + private SSLServerSocketFactory sslSrvSockFactory; + + /** SSL socket factory. */ + private SocketFactory sslSockFactory; + + /** Fake TCP discovery SPI. */ + private TcpDiscoverySpi fakeTcpDiscoverySpi; + + /** + * Configure SSL and Discovery. + */ + @Before + public void before() { + SslContextFactory socketFactory = (SslContextFactory)GridTestUtils.sslTrustedFactory("node01", "trustone"); + SSLContext sslCtx = socketFactory.create(); + + sslSrvSockFactory = sslCtx.getServerSocketFactory(); + sslSockFactory = sslCtx.getSocketFactory(); + fakeTcpDiscoverySpi = new TcpDiscoverySpi(); + + fakeTcpDiscoverySpi.setSoLinger(1); + } + + /** + * It creates a SSL socket server and client for checks correctness closing when write exceed read. + * + * @throws Exception If failed. + */ + @Test + public void sslSocketTest() throws Exception { + try (ServerSocket listen = sslSrvSockFactory.createServerSocket(PORT_TO_LNSR)) { + System.out.println("Server started."); + + IgniteInternalFuture clientFut = GridTestUtils.runAsync(this::startSslClient); + + Socket connection = listen.accept(); + + try { + fakeTcpDiscoverySpi.configureSocketOptions(connection); + + InputStream in = connection.getInputStream(); + OutputStream out = connection.getOutputStream(); + + readHadshake(connection); + + connection.getOutputStream().write(U.IGNITE_HEADER); + + clientFut.get(10_000); + } + catch (IgniteFutureTimeoutCheckedException e) { + U.closeQuiet(connection); + + fail("Can't wait connection closed from client side."); + } + catch (Exception e) { + U.closeQuiet(connection); + + System.out.println("Ex: " + e.getMessage() + " (Socket closed)"); + } + } + } + + /** + * Reads handshake bytes and checks correctness. + * + * @param connection Socket connection. + * @throws IOException If have some issue happens in time read from socket. + */ + public void readHadshake(Socket connection) throws IOException { + byte[] buf = new byte[4]; + int read = 0; + + while (read < buf.length) { + int r = connection.getInputStream().read(buf, read, buf.length - read); + + if (r >= 0) + read += r; + else + fail("Failed to read from socket."); + } + + assertEquals("Handshake did not pass, readed bytes: " + read, Arrays.asList(U.IGNITE_HEADER), Arrays.asList(U.IGNITE_HEADER)); + } + + /** + * Test starts ssl client socket and writes data until socket's write blocking. When the socket is blocking on write + * tries to close it. + */ + public void startSslClient() { + try (Socket clientSocket = sslSockFactory.createSocket(HOST, PORT_TO_LNSR)) { + System.out.println("Client started."); + + fakeTcpDiscoverySpi.configureSocketOptions(clientSocket); + + long handshakeStartTime = System.currentTimeMillis(); + + //need to send message in order to ssl handshake passed. + clientSocket.getOutputStream().write(U.IGNITE_HEADER); + + readHadshake(clientSocket); + + long handshakeInterval = System.currentTimeMillis() - handshakeStartTime; + + System.out.println("Handshake time: " + handshakeInterval + "ms"); + + int iter = 0; + + try { + while (true) { + iter++; + + IgniteInternalFuture writeFut = GridTestUtils.runAsync(() -> { + try { + clientSocket.getOutputStream().write(new byte[4 * 1024]); + } + catch (IOException e) { + assertEquals("Socket closed", e.getMessage()); + } + }); + + writeFut.get(10 * handshakeInterval); + } + } + catch (IgniteFutureTimeoutCheckedException e) { + System.out.println("Socket stuck on write, when passed " + (iter * 4) + "KB through itself."); + } + + System.out.println("Try to close a socket."); //see in try-catch-resource + } + catch (Exception e) { + fail(e.getMessage()); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java index 00704a5..22931a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java @@ -25,6 +25,7 @@ import org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest; import org.apache.ignite.spi.discovery.IgniteClientReconnectEventHandlingTest; import org.apache.ignite.spi.discovery.IgniteDiscoveryCacheReuseSelfTest; import org.apache.ignite.spi.discovery.LongClientConnectToClusterTest; +import org.apache.ignite.spi.discovery.tcp.DiscoveryClientSocketTest; import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest; import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectSslTest; import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest; @@ -162,7 +163,9 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP TcpDiscoveryReconnectUnstableTopologyTest.class, - IgniteMetricsOverflowTest.class + IgniteMetricsOverflowTest.class, + + DiscoveryClientSocketTest.class }) public class IgniteSpiDiscoverySelfTestSuite { /** */