Repository: asterixdb
Updated Branches:
  refs/heads/master a81b7954b -> 4085b48f6


[NO ISSUE][NET] Networking Improvements

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Set keep alive and no TCP delay options
  on socket channels.
- Cancel key and close IPC handle on failed
  read/writes to avoid getting the same failures
  with every NetworkThread loop.

Change-Id: I60c1f9cfe2ea577fca14cd2e98c6461c49df011a
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2418
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4085b48f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4085b48f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4085b48f

Branch: refs/heads/master
Commit: 4085b48f6de4d9e2a726dfd7221bc3e70b1c8e32
Parents: a81b795
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Thu Feb 22 21:29:51 2018 +0300
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Thu Feb 22 15:31:50 2018 -0800

----------------------------------------------------------------------
 .../replication/api/PartitionReplica.java       |   3 +-
 .../replication/api/ReplicationDestination.java |   2 +
 hyracks-fullstack/hyracks/hyracks-ipc/pom.xml   |   5 +
 .../hyracks/ipc/impl/IPCConnectionManager.java  | 110 +++++++++++--------
 hyracks-fullstack/hyracks/hyracks-net/pom.xml   |   5 +
 .../hyracks/net/protocols/tcp/TCPEndpoint.java  |  17 +--
 .../org/apache/hyracks/util/NetworkUtil.java    |  49 +++++++++
 7 files changed, 139 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index d3ddc43..bfac451 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -32,7 +32,7 @@ import 
org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import org.apache.asterix.replication.sync.ReplicaSynchronizer;
-import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.hyracks.util.StorageUtil;
 import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
@@ -97,6 +97,7 @@ public class PartitionReplica implements IPartitionReplica {
         try {
             if (sc == null || !sc.isOpen() || !sc.isConnected()) {
                 sc = SocketChannel.open();
+                NetworkUtil.configure(sc);
                 sc.configureBlocking(true);
                 sc.connect(id.getLocation());
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
index a092322..8ccfced 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java
@@ -30,6 +30,7 @@ import 
org.apache.asterix.common.exceptions.ReplicationException;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.replication.IReplicationDestination;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -78,6 +79,7 @@ public class ReplicationDestination implements 
IReplicationDestination {
         try {
             if (logRepChannel == null || !logRepChannel.isOpen() || 
!logRepChannel.isConnected()) {
                 logRepChannel = SocketChannel.open();
+                NetworkUtil.configure(logRepChannel);
                 logRepChannel.configureBlocking(true);
                 logRepChannel.connect(location);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index 7f492eb..7f59db1 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -53,5 +53,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 86c8c75..3e6c64b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.ipc.impl;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.StandardSocketOptions;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channel;
 import java.nio.channels.ClosedChannelException;
@@ -41,6 +40,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -218,14 +218,12 @@ public class IPCConnectionManager {
                     if (!workingPendingConnections.isEmpty()) {
                         for (IPCHandle handle : workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
-                            openChannels.add(channel);
-                            
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
-                            channel.configureBlocking(false);
+                            register(channel);
                             SelectionKey cKey;
                             if (channel.connect(handle.getRemoteAddress())) {
                                 cKey = channel.register(selector, 
SelectionKey.OP_READ);
                                 handle.setState(HandleState.CONNECT_SENT);
-                                write(createInitialReqMessage(handle));
+                                
IPCConnectionManager.this.write(createInitialReqMessage(handle));
                             } else {
                                 cKey = channel.register(selector, 
SelectionKey.OP_CONNECT);
                             }
@@ -273,48 +271,15 @@ public class IPCConnectionManager {
                         for (Iterator<SelectionKey> i = 
selector.selectedKeys().iterator(); i.hasNext();) {
                             SelectionKey key = i.next();
                             i.remove();
-                            SelectableChannel sc = key.channel();
+                            final SelectableChannel sc = key.channel();
                             if (key.isReadable()) {
-                                SocketChannel channel = (SocketChannel) sc;
-                                IPCHandle handle = (IPCHandle) 
key.attachment();
-                                ByteBuffer readBuffer = handle.getInBuffer();
-                                int len = channel.read(readBuffer);
-                                
system.getPerformanceCounters().addMessageBytesReceived(len);
-                                if (len < 0) {
-                                    key.cancel();
-                                    IOUtils.closeQuietly(channel);
-                                    openChannels.remove(channel);
-                                    handle.close();
-                                } else {
-                                    handle.processIncomingMessages();
-                                    if (!readBuffer.hasRemaining()) {
-                                        handle.resizeInBuffer();
-                                    }
-                                }
+                                read(key);
                             } else if (key.isWritable()) {
-                                SocketChannel channel = (SocketChannel) sc;
-                                IPCHandle handle = (IPCHandle) 
key.attachment();
-                                ByteBuffer writeBuffer = handle.getOutBuffer();
-                                int len = channel.write(writeBuffer);
-                                
system.getPerformanceCounters().addMessageBytesSent(len);
-                                if (len < 0) {
-                                    key.cancel();
-                                    IOUtils.closeQuietly(channel);
-                                    openChannels.remove(channel);
-                                    handle.close();
-                                } else if (!writeBuffer.hasRemaining()) {
-                                    key.interestOps(key.interestOps() & 
~SelectionKey.OP_WRITE);
-                                }
-                                if (handle.full()) {
-                                    handle.clearFull();
-                                    selector.wakeup();
-                                }
+                                write(key);
                             } else if (key.isAcceptable()) {
                                 assert sc == serverSocketChannel;
                                 SocketChannel channel = 
serverSocketChannel.accept();
-                                openChannels.add(channel);
-                                
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
-                                channel.configureBlocking(false);
+                                register(channel);
                                 IPCHandle handle = new IPCHandle(system, null);
                                 SelectionKey cKey = channel.register(selector, 
SelectionKey.OP_READ);
                                 handle.setKey(cKey);
@@ -331,7 +296,7 @@ public class IPCConnectionManager {
                                 handle.setState(HandleState.CONNECT_SENT);
                                 registerHandle(handle);
                                 key.interestOps(SelectionKey.OP_READ);
-                                write(createInitialReqMessage(handle));
+                                
IPCConnectionManager.this.write(createInitialReqMessage(handle));
                             }
                         }
                     }
@@ -378,6 +343,65 @@ public class IPCConnectionManager {
             workingSendList.clear();
             moveAll(tempUnsentMessages, workingSendList);
         }
+
+        private void register(SocketChannel channel) throws IOException {
+            openChannels.add(channel);
+            NetworkUtil.configure(channel);
+            channel.configureBlocking(false);
+        }
+
+        private void read(SelectionKey readableKey) {
+            SocketChannel channel = (SocketChannel) readableKey.channel();
+            IPCHandle handle = (IPCHandle) readableKey.attachment();
+            ByteBuffer readBuffer = handle.getInBuffer();
+            try {
+                int len = channel.read(readBuffer);
+                if (len < 0) {
+                    close(readableKey, channel);
+                    return;
+                }
+                system.getPerformanceCounters().addMessageBytesReceived(len);
+                handle.processIncomingMessages();
+                if (!readBuffer.hasRemaining()) {
+                    handle.resizeInBuffer();
+                }
+            } catch (IOException e) {
+                LOGGER.error("TCP read error from {}", 
handle.getRemoteAddress(), e);
+                close(readableKey, channel);
+            }
+        }
+
+        private void write(SelectionKey writableKey) {
+            SocketChannel channel = (SocketChannel) writableKey.channel();
+            IPCHandle handle = (IPCHandle) writableKey.attachment();
+            ByteBuffer writeBuffer = handle.getOutBuffer();
+            try {
+                int len = channel.write(writeBuffer);
+                if (len < 0) {
+                    close(writableKey, channel);
+                    return;
+                }
+                system.getPerformanceCounters().addMessageBytesSent(len);
+                if (!writeBuffer.hasRemaining()) {
+                    writableKey.interestOps(writableKey.interestOps() & 
~SelectionKey.OP_WRITE);
+                }
+                if (handle.full()) {
+                    handle.clearFull();
+                    selector.wakeup();
+                }
+            } catch (IOException e) {
+                LOGGER.error("TCP write error to {}", 
handle.getRemoteAddress(), e);
+                close(writableKey, channel);
+            }
+        }
+
+        private void close(SelectionKey key, SocketChannel sc) {
+            key.cancel();
+            NetworkUtil.closeQuietly(sc);
+            openChannels.remove(sc);
+            final IPCHandle handle = (IPCHandle) key.attachment();
+            handle.close();
+        }
     }
 
     private <T> void moveAll(List<T> source, List<T> target) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-net/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 36e33c5..1040e81 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -55,5 +55,10 @@
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 4633cf3..b2efe7f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.net.protocols.tcp;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
-import java.net.StandardSocketOptions;
 import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -31,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -129,9 +129,7 @@ public class TCPEndpoint {
                     if (!workingPendingConnections.isEmpty()) {
                         for (InetSocketAddress address : 
workingPendingConnections) {
                             SocketChannel channel = SocketChannel.open();
-                            
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
-                            
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
-                            channel.configureBlocking(false);
+                            register(channel);
                             boolean connect = false;
                             boolean failure = false;
                             try {
@@ -156,9 +154,7 @@ public class TCPEndpoint {
                     }
                     if (!workingIncomingConnections.isEmpty()) {
                         for (SocketChannel channel : 
workingIncomingConnections) {
-                            
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
-                            
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
-                            channel.configureBlocking(false);
+                            register(channel);
                             SelectionKey sKey = channel.register(selector, 0);
                             TCPConnection connection = new 
TCPConnection(TCPEndpoint.this, channel, sKey, selector);
                             sKey.attach(connection);
@@ -211,7 +207,7 @@ public class TCPEndpoint {
                         }
                     }
                 } catch (Exception e) {
-                    LOGGER.error("Error in TCPEndpoint " + localAddress, e);
+                    LOGGER.error("Error in TCPEndpoint {}", localAddress, e);
                 }
             }
         }
@@ -250,5 +246,10 @@ public class TCPEndpoint {
                 incomingConnections.clear();
             }
         }
+
+        private void register(SocketChannel channel) throws IOException {
+            NetworkUtil.configure(channel);
+            channel.configureBlocking(false);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
new file mode 100644
index 0000000..f9f45c1
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.util;
+
+import java.io.IOException;
+import java.net.StandardSocketOptions;
+import java.nio.channels.SocketChannel;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class NetworkUtil {
+
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private NetworkUtil() {
+    }
+
+    public static void configure(SocketChannel sc) throws IOException {
+        sc.setOption(StandardSocketOptions.TCP_NODELAY, true);
+        sc.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
+    }
+
+    public static void closeQuietly(SocketChannel sc) {
+        if (sc.isOpen()) {
+            try {
+                sc.close();
+            } catch (IOException e) {
+                LOGGER.warn("Failed to close socket", e);
+            }
+        }
+    }
+}

Reply via email to