This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d55653  IGNITE-12818 SoLinger should be used for all sockets in 
discovery - Fixes #7557.
7d55653 is described below

commit 7d55653abc029c18d45a1fba9ab5a9f778f7c80d
Author: Vladislav Pyatkov <vldpyat...@gmail.com>
AuthorDate: Thu Apr 2 13:34:08 2020 +0300

    IGNITE-12818 SoLinger should be used for all sockets in discovery - Fixes 
#7557.
    
    Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>
---
 .../ignite/spi/discovery/tcp/ClientImpl.java       |  10 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       |   3 +-
 .../ignite/spi/discovery/tcp/TcpDiscoverySpi.java  |  18 +-
 .../discovery/tcp/DiscoveryClientSocketTest.java   | 184 +++++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java           |   5 +-
 5 files changed, 210 insertions(+), 10 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 2a9168c..d45dcf7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -1122,8 +1122,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                 try {
                     InputStream in = sockStream.stream();
 
-                    sock.setKeepAlive(true);
-                    sock.setTcpNoDelay(true);
+                    assert sock.getKeepAlive() && sock.getTcpNoDelay() : 
"Socket wasn't configured properly:" +
+                        " KeepAlive " + sock.getKeepAlive() +
+                        " TcpNoDelay " + sock.getTcpNoDelay();
 
                     while (!isInterrupted()) {
                         TcpDiscoveryAbstractMessage msg;
@@ -1552,8 +1553,9 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         InputStream in = sockStream.stream();
 
-                        sock.setKeepAlive(true);
-                        sock.setTcpNoDelay(true);
+                        assert sock.getKeepAlive() && sock.getTcpNoDelay() : 
"Socket wasn't configured properly:" +
+                            " KeepAlive " + sock.getKeepAlive() +
+                            " TcpNoDelay " + sock.getTcpNoDelay();
 
                         List<TcpDiscoveryAbstractMessage> msgs = null;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index e6f4136..2886478 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -6470,8 +6470,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 try {
                     // Set socket options.
-                    sock.setKeepAlive(true);
-                    sock.setTcpNoDelay(true);
+                    spi.configureSocketOptions(sock);
 
                     int timeout = sock.getSoTimeout();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index d13e25f..6853cdd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1572,9 +1572,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
 
             sock.bind(new InetSocketAddress(locHost, 0));
 
-            sock.setTcpNoDelay(true);
-
-            sock.setSoLinger(getSoLinger() >= 0, getSoLinger());
+            configureSocketOptions(sock);
 
             return sock;
         } catch (IOException e) {
@@ -1631,6 +1629,20 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter 
implements IgniteDiscovery
     }
 
     /**
+     * Configures socket options.
+     *
+     * @param sock Socket.
+     * @throws SocketException If failed.
+     */
+    void configureSocketOptions(Socket sock) throws SocketException {
+        sock.setTcpNoDelay(true);
+
+        sock.setSoLinger(getSoLinger() >= 0, getSoLinger());
+
+        sock.setKeepAlive(true);
+    }
+
+    /**
      * Writes message to the socket.
      *
      * @param sock Socket.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java
new file mode 100644
index 0000000..c94ad1b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/DiscoveryClientSocketTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocketFactory;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.ssl.SslContextFactory;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Check a ssl socket configuration which used in discovery.
+ */
+public class DiscoveryClientSocketTest {
+    /** Port to listen. */
+    public static final int PORT_TO_LNSR = 12346;
+
+    /** Host for bind. */
+    public static final String HOST = "localhost";
+
+    /** SSL server socket factory. */
+    private SSLServerSocketFactory sslSrvSockFactory;
+
+    /** SSL socket factory. */
+    private SocketFactory sslSockFactory;
+
+    /** Fake TCP discovery SPI. */
+    private TcpDiscoverySpi fakeTcpDiscoverySpi;
+
+    /**
+     * Configure SSL and Discovery.
+     */
+    @Before
+    public void before() {
+        SslContextFactory socketFactory = 
(SslContextFactory)GridTestUtils.sslTrustedFactory("node01", "trustone");
+        SSLContext sslCtx = socketFactory.create();
+
+        sslSrvSockFactory = sslCtx.getServerSocketFactory();
+        sslSockFactory = sslCtx.getSocketFactory();
+        fakeTcpDiscoverySpi = new TcpDiscoverySpi();
+
+        fakeTcpDiscoverySpi.setSoLinger(1);
+    }
+
+    /**
+     * It creates a SSL socket server and client for checks correctness 
closing when write exceed read.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void sslSocketTest() throws Exception {
+        try (ServerSocket listen = 
sslSrvSockFactory.createServerSocket(PORT_TO_LNSR)) {
+            System.out.println("Server started.");
+
+            IgniteInternalFuture clientFut = 
GridTestUtils.runAsync(this::startSslClient);
+
+            Socket connection = listen.accept();
+
+            try {
+                fakeTcpDiscoverySpi.configureSocketOptions(connection);
+
+                InputStream in = connection.getInputStream();
+                OutputStream out = connection.getOutputStream();
+
+                readHadshake(connection);
+
+                connection.getOutputStream().write(U.IGNITE_HEADER);
+
+                clientFut.get(10_000);
+            }
+            catch (IgniteFutureTimeoutCheckedException e) {
+                U.closeQuiet(connection);
+
+                fail("Can't wait connection closed from client side.");
+            }
+            catch (Exception e) {
+                U.closeQuiet(connection);
+
+                System.out.println("Ex: " + e.getMessage() + " (Socket 
closed)");
+            }
+        }
+    }
+
+    /**
+     * Reads handshake bytes and checks correctness.
+     *
+     * @param connection Socket connection.
+     * @throws IOException If have some issue happens in time read from socket.
+     */
+    public void readHadshake(Socket connection) throws IOException {
+        byte[] buf = new byte[4];
+        int read = 0;
+
+        while (read < buf.length) {
+            int r = connection.getInputStream().read(buf, read, buf.length - 
read);
+
+            if (r >= 0)
+                read += r;
+            else
+                fail("Failed to read from socket.");
+        }
+
+        assertEquals("Handshake did not pass, readed bytes: " + read, 
Arrays.asList(U.IGNITE_HEADER), Arrays.asList(U.IGNITE_HEADER));
+    }
+
+    /**
+     * Test starts ssl client socket and writes data until socket's write 
blocking. When the socket is blocking on write
+     * tries to close it.
+     */
+    public void startSslClient() {
+        try (Socket clientSocket = sslSockFactory.createSocket(HOST, 
PORT_TO_LNSR)) {
+            System.out.println("Client started.");
+
+            fakeTcpDiscoverySpi.configureSocketOptions(clientSocket);
+
+            long handshakeStartTime = System.currentTimeMillis();
+
+            //need to send message in order to ssl handshake passed.
+            clientSocket.getOutputStream().write(U.IGNITE_HEADER);
+
+            readHadshake(clientSocket);
+
+            long handshakeInterval = System.currentTimeMillis() - 
handshakeStartTime;
+
+            System.out.println("Handshake time: " + handshakeInterval + "ms");
+
+            int iter = 0;
+
+            try {
+                while (true) {
+                    iter++;
+
+                    IgniteInternalFuture writeFut = GridTestUtils.runAsync(() 
-> {
+                        try {
+                            clientSocket.getOutputStream().write(new byte[4 * 
1024]);
+                        }
+                        catch (IOException e) {
+                            assertEquals("Socket closed", e.getMessage());
+                        }
+                    });
+
+                    writeFut.get(10 * handshakeInterval);
+                }
+            }
+            catch (IgniteFutureTimeoutCheckedException e) {
+                System.out.println("Socket stuck on write, when passed " + 
(iter * 4) + "KB through itself.");
+            }
+
+            System.out.println("Try to close a socket."); //see in 
try-catch-resource
+        }
+        catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 00704a5..22931a8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest;
 import org.apache.ignite.spi.discovery.IgniteClientReconnectEventHandlingTest;
 import org.apache.ignite.spi.discovery.IgniteDiscoveryCacheReuseSelfTest;
 import org.apache.ignite.spi.discovery.LongClientConnectToClusterTest;
+import org.apache.ignite.spi.discovery.tcp.DiscoveryClientSocketTest;
 import org.apache.ignite.spi.discovery.tcp.DiscoveryUnmarshalVulnerabilityTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectSslTest;
 import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
@@ -162,7 +163,9 @@ import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
 
     TcpDiscoveryReconnectUnstableTopologyTest.class,
 
-    IgniteMetricsOverflowTest.class
+    IgniteMetricsOverflowTest.class,
+
+    DiscoveryClientSocketTest.class
 })
 public class IgniteSpiDiscoverySelfTestSuite {
     /** */

Reply via email to