Repository: asterixdb Updated Branches: refs/heads/master a81b7954b -> 4085b48f6
[NO ISSUE][NET] Networking Improvements - user model changes: no - storage format changes: no - interface changes: no Details: - Set keep alive and no TCP delay options on socket channels. - Cancel key and close IPC handle on failed read/writes to avoid getting the same failures with every NetworkThread loop. Change-Id: I60c1f9cfe2ea577fca14cd2e98c6461c49df011a Reviewed-on: https://asterix-gerrit.ics.uci.edu/2418 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/4085b48f Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/4085b48f Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/4085b48f Branch: refs/heads/master Commit: 4085b48f6de4d9e2a726dfd7221bc3e70b1c8e32 Parents: a81b795 Author: Murtadha Hubail <mhub...@apache.org> Authored: Thu Feb 22 21:29:51 2018 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Thu Feb 22 15:31:50 2018 -0800 ---------------------------------------------------------------------- .../replication/api/PartitionReplica.java | 3 +- .../replication/api/ReplicationDestination.java | 2 + hyracks-fullstack/hyracks/hyracks-ipc/pom.xml | 5 + .../hyracks/ipc/impl/IPCConnectionManager.java | 110 +++++++++++-------- hyracks-fullstack/hyracks/hyracks-net/pom.xml | 5 + .../hyracks/net/protocols/tcp/TCPEndpoint.java | 17 +-- .../org/apache/hyracks/util/NetworkUtil.java | 49 +++++++++ 7 files changed, 139 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java index d3ddc43..bfac451 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java @@ -32,7 +32,7 @@ import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.storage.ReplicaIdentifier; import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.asterix.replication.sync.ReplicaSynchronizer; -import org.apache.hyracks.util.JSONUtil; +import org.apache.hyracks.util.NetworkUtil; import org.apache.hyracks.util.StorageUtil; import org.apache.hyracks.util.annotations.ThreadSafe; import org.apache.logging.log4j.LogManager; @@ -97,6 +97,7 @@ public class PartitionReplica implements IPartitionReplica { try { if (sc == null || !sc.isOpen() || !sc.isConnected()) { sc = SocketChannel.open(); + NetworkUtil.configure(sc); sc.configureBlocking(true); sc.connect(id.getLocation()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java index a092322..8ccfced 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/ReplicationDestination.java @@ -30,6 +30,7 @@ import org.apache.asterix.common.exceptions.ReplicationException; import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.replication.IReplicationDestination; import org.apache.asterix.replication.messaging.ReplicationProtocol; +import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -78,6 +79,7 @@ public class ReplicationDestination implements IReplicationDestination { try { if (logRepChannel == null || !logRepChannel.isOpen() || !logRepChannel.isConnected()) { logRepChannel = SocketChannel.open(); + NetworkUtil.configure(logRepChannel); logRepChannel.configureBlocking(true); logRepChannel.connect(location); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml index 7f492eb..7f59db1 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml @@ -53,5 +53,10 @@ <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-util</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index 86c8c75..3e6c64b 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -21,7 +21,6 @@ package org.apache.hyracks.ipc.impl; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.ClosedChannelException; @@ -41,6 +40,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.io.IOUtils; +import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -218,14 +218,12 @@ public class IPCConnectionManager { if (!workingPendingConnections.isEmpty()) { for (IPCHandle handle : workingPendingConnections) { SocketChannel channel = SocketChannel.open(); - openChannels.add(channel); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.configureBlocking(false); + register(channel); SelectionKey cKey; if (channel.connect(handle.getRemoteAddress())) { cKey = channel.register(selector, SelectionKey.OP_READ); handle.setState(HandleState.CONNECT_SENT); - write(createInitialReqMessage(handle)); + IPCConnectionManager.this.write(createInitialReqMessage(handle)); } else { cKey = channel.register(selector, SelectionKey.OP_CONNECT); } @@ -273,48 +271,15 @@ public class IPCConnectionManager { for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) { SelectionKey key = i.next(); i.remove(); - SelectableChannel sc = key.channel(); + final SelectableChannel sc = key.channel(); if (key.isReadable()) { - SocketChannel channel = (SocketChannel) sc; - IPCHandle handle = (IPCHandle) key.attachment(); - ByteBuffer readBuffer = handle.getInBuffer(); - int len = channel.read(readBuffer); - system.getPerformanceCounters().addMessageBytesReceived(len); - if (len < 0) { - key.cancel(); - IOUtils.closeQuietly(channel); - openChannels.remove(channel); - handle.close(); - } else { - handle.processIncomingMessages(); - if (!readBuffer.hasRemaining()) { - handle.resizeInBuffer(); - } - } + read(key); } else if (key.isWritable()) { - SocketChannel channel = (SocketChannel) sc; - IPCHandle handle = (IPCHandle) key.attachment(); - ByteBuffer writeBuffer = handle.getOutBuffer(); - int len = channel.write(writeBuffer); - system.getPerformanceCounters().addMessageBytesSent(len); - if (len < 0) { - key.cancel(); - IOUtils.closeQuietly(channel); - openChannels.remove(channel); - handle.close(); - } else if (!writeBuffer.hasRemaining()) { - key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); - } - if (handle.full()) { - handle.clearFull(); - selector.wakeup(); - } + write(key); } else if (key.isAcceptable()) { assert sc == serverSocketChannel; SocketChannel channel = serverSocketChannel.accept(); - openChannels.add(channel); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.configureBlocking(false); + register(channel); IPCHandle handle = new IPCHandle(system, null); SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ); handle.setKey(cKey); @@ -331,7 +296,7 @@ public class IPCConnectionManager { handle.setState(HandleState.CONNECT_SENT); registerHandle(handle); key.interestOps(SelectionKey.OP_READ); - write(createInitialReqMessage(handle)); + IPCConnectionManager.this.write(createInitialReqMessage(handle)); } } } @@ -378,6 +343,65 @@ public class IPCConnectionManager { workingSendList.clear(); moveAll(tempUnsentMessages, workingSendList); } + + private void register(SocketChannel channel) throws IOException { + openChannels.add(channel); + NetworkUtil.configure(channel); + channel.configureBlocking(false); + } + + private void read(SelectionKey readableKey) { + SocketChannel channel = (SocketChannel) readableKey.channel(); + IPCHandle handle = (IPCHandle) readableKey.attachment(); + ByteBuffer readBuffer = handle.getInBuffer(); + try { + int len = channel.read(readBuffer); + if (len < 0) { + close(readableKey, channel); + return; + } + system.getPerformanceCounters().addMessageBytesReceived(len); + handle.processIncomingMessages(); + if (!readBuffer.hasRemaining()) { + handle.resizeInBuffer(); + } + } catch (IOException e) { + LOGGER.error("TCP read error from {}", handle.getRemoteAddress(), e); + close(readableKey, channel); + } + } + + private void write(SelectionKey writableKey) { + SocketChannel channel = (SocketChannel) writableKey.channel(); + IPCHandle handle = (IPCHandle) writableKey.attachment(); + ByteBuffer writeBuffer = handle.getOutBuffer(); + try { + int len = channel.write(writeBuffer); + if (len < 0) { + close(writableKey, channel); + return; + } + system.getPerformanceCounters().addMessageBytesSent(len); + if (!writeBuffer.hasRemaining()) { + writableKey.interestOps(writableKey.interestOps() & ~SelectionKey.OP_WRITE); + } + if (handle.full()) { + handle.clearFull(); + selector.wakeup(); + } + } catch (IOException e) { + LOGGER.error("TCP write error to {}", handle.getRemoteAddress(), e); + close(writableKey, channel); + } + } + + private void close(SelectionKey key, SocketChannel sc) { + key.cancel(); + NetworkUtil.closeQuietly(sc); + openChannels.remove(sc); + final IPCHandle handle = (IPCHandle) key.attachment(); + handle.close(); + } } private <T> void moveAll(List<T> source, List<T> target) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-net/pom.xml ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml index 36e33c5..1040e81 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml @@ -55,5 +55,10 @@ <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-util</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java index 4633cf3..b2efe7f 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java @@ -21,7 +21,6 @@ package org.apache.hyracks.net.protocols.tcp; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.net.StandardSocketOptions; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -31,6 +30,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -129,9 +129,7 @@ public class TCPEndpoint { if (!workingPendingConnections.isEmpty()) { for (InetSocketAddress address : workingPendingConnections) { SocketChannel channel = SocketChannel.open(); - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - channel.configureBlocking(false); + register(channel); boolean connect = false; boolean failure = false; try { @@ -156,9 +154,7 @@ public class TCPEndpoint { } if (!workingIncomingConnections.isEmpty()) { for (SocketChannel channel : workingIncomingConnections) { - channel.setOption(StandardSocketOptions.TCP_NODELAY, true); - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); - channel.configureBlocking(false); + register(channel); SelectionKey sKey = channel.register(selector, 0); TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector); sKey.attach(connection); @@ -211,7 +207,7 @@ public class TCPEndpoint { } } } catch (Exception e) { - LOGGER.error("Error in TCPEndpoint " + localAddress, e); + LOGGER.error("Error in TCPEndpoint {}", localAddress, e); } } } @@ -250,5 +246,10 @@ public class TCPEndpoint { incomingConnections.clear(); } } + + private void register(SocketChannel channel) throws IOException { + NetworkUtil.configure(channel); + channel.configureBlocking(false); + } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/4085b48f/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java new file mode 100644 index 0000000..f9f45c1 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/NetworkUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.util; + +import java.io.IOException; +import java.net.StandardSocketOptions; +import java.nio.channels.SocketChannel; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class NetworkUtil { + + private static final Logger LOGGER = LogManager.getLogger(); + + private NetworkUtil() { + } + + public static void configure(SocketChannel sc) throws IOException { + sc.setOption(StandardSocketOptions.TCP_NODELAY, true); + sc.setOption(StandardSocketOptions.SO_KEEPALIVE, true); + } + + public static void closeQuietly(SocketChannel sc) { + if (sc.isOpen()) { + try { + sc.close(); + } catch (IOException e) { + LOGGER.warn("Failed to close socket", e); + } + } + } +}