http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/NettyFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java new file mode 100644 index 0000000..13d8810 --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java @@ -0,0 +1,375 @@ +package org.apache.cassandra.net.async; + +import java.net.InetSocketAddress; +import java.util.zip.Checksum; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.compression.Lz4FrameDecoder; +import io.netty.handler.codec.compression.Lz4FrameEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.OpenSsl; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.DefaultEventExecutor; +import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.logging.InternalLoggerFactory; +import io.netty.util.internal.logging.Slf4JLoggerFactory; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.xxhash.XXHashFactory; +import org.apache.cassandra.auth.IInternodeAuthenticator; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.security.SSLFactory; +import org.apache.cassandra.service.NativeTransportService; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.CoalescingStrategies; +import org.apache.cassandra.utils.FBUtilities; + +/** + * A factory for building Netty {@link Channel}s. Channels here are setup with a pipeline to participate + * in the internode protocol handshake, either the inbound or outbound side as per the method invoked. + */ +public final class NettyFactory +{ + private static final Logger logger = LoggerFactory.getLogger(NettyFactory.class); + + /** + * The block size for use with netty's lz4 code. + */ + private static final int COMPRESSION_BLOCK_SIZE = 1 << 16; + + private static final int LZ4_HASH_SEED = 0x9747b28c; + + public enum Mode { MESSAGING, STREAMING } + + private static final String SSL_CHANNEL_HANDLER_NAME = "ssl"; + static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor"; + static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor"; + private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; + + /** a useful addition for debugging; simply set to true to get more data in your logs */ + private static final boolean WIRETRACE = false; + static + { + if (WIRETRACE) + InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); + } + + private static final boolean DEFAULT_USE_EPOLL = NativeTransportService.useEpoll(); + static + { + if (!DEFAULT_USE_EPOLL) + logger.warn("epoll not availble {}", Epoll.unavailabilityCause()); + } + + /** + * The size of the receive queue for the outbound channels. As outbound channels do not receive data + * (outside of the internode messaging protocol's handshake), this value can be relatively small. + */ + private static final int OUTBOUND_CHANNEL_RECEIVE_BUFFER_SIZE = 1 << 10; + + /** + * The size of the send queue for the inbound channels. As inbound channels do not send data + * (outside of the internode messaging protocol's handshake), this value can be relatively small. + */ + private static final int INBOUND_CHANNEL_SEND_BUFFER_SIZE = 1 << 10; + + /** + * A factory instance that all normal, runtime code should use. Separate instances should only be used for testing. + */ + public static final NettyFactory instance = new NettyFactory(DEFAULT_USE_EPOLL); + + private final boolean useEpoll; + private final EventLoopGroup acceptGroup; + + private final EventLoopGroup inboundGroup; + private final EventLoopGroup outboundGroup; + + /** + * Constructor that allows modifying the {@link NettyFactory#useEpoll} for testing purposes. Otherwise, use the + * default {@link #instance}. + */ + @VisibleForTesting + NettyFactory(boolean useEpoll) + { + this.useEpoll = useEpoll; + acceptGroup = getEventLoopGroup(useEpoll, determineAcceptGroupSize(DatabaseDescriptor.getServerEncryptionOptions().internode_encryption), + "MessagingService-NettyAcceptor-Threads", false); + inboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Threads", false); + outboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Threads", true); + } + + /** + * Determine the number of accept threads we need, which is based upon the number of listening sockets we will have. + * We'll have either 1 or 2 listen sockets, depending on if we use SSL or not in combination with non-SSL. If we have both, + * we'll have two sockets, and thus need two threads; else one socket and one thread. + * + * If the operator has configured multiple IP addresses (both {@link org.apache.cassandra.config.Config#broadcast_address} + * and {@link org.apache.cassandra.config.Config#listen_address} are configured), then we listen on another set of sockets + * - basically doubling the count. See CASSANDRA-9748 for more details. + */ + static int determineAcceptGroupSize(InternodeEncryption internode_encryption) + { + int listenSocketCount = internode_encryption == InternodeEncryption.dc || internode_encryption == InternodeEncryption.rack ? 2 : 1; + + if (MessagingService.shouldListenOnBroadcastAddress()) + listenSocketCount *= 2; + + return listenSocketCount; + } + + /** + * Create an {@link EventLoopGroup}, for epoll or nio. The {@code boostIoRatio} flag passes a hint to the netty + * event loop threads to optimize comsuming all the tasks from the netty channel before checking for IO activity. + * By default, netty will process some maximum number of tasks off it's queue before it will check for activity on + * any of the open FDs, which basically amounts to checking for any incoming data. If you have a class of event loops + * that that do almost *no* inbound activity (like cassandra's outbound channels), then it behooves us to have the + * outbound netty channel consume as many tasks as it can before making the system calls to check up on the FDs, + * as we're not expecting any incoming data on those sockets, anyways. Thus, we pass the magic value {@code 100} + * to achieve the maximum consuption from the netty queue. (for implementation details, as of netty 4.1.8, + * see {@link io.netty.channel.epoll.EpollEventLoop#run()}. + */ + static EventLoopGroup getEventLoopGroup(boolean useEpoll, int threadCount, String threadNamePrefix, boolean boostIoRatio) + { + if (useEpoll) + { + logger.debug("using netty epoll event loop for pool prefix {}", threadNamePrefix); + EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix)); + if (boostIoRatio) + eventLoopGroup.setIoRatio(100); + return eventLoopGroup; + } + + logger.debug("using netty nio event loop for pool prefix {}", threadNamePrefix); + NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix)); + if (boostIoRatio) + eventLoopGroup.setIoRatio(100); + return eventLoopGroup; + } + + /** + * Create a {@link Channel} that listens on the {@code localAddr}. This method will block while trying to bind to the address, + * but it does not make a remote call. + */ + public Channel createInboundChannel(InetSocketAddress localAddr, InboundInitializer initializer, int receiveBufferSize) throws ConfigurationException + { + String nic = FBUtilities.getNetworkInterface(localAddr.getAddress()); + logger.info("Starting Messaging Service on {} {}, encryption: {}", + localAddr, nic == null ? "" : String.format(" (%s)", nic), encryptionLogStatement(initializer.encryptionOptions)); + Class<? extends ServerChannel> transport = useEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class; + ServerBootstrap bootstrap = new ServerBootstrap().group(acceptGroup, inboundGroup) + .channel(transport) + .option(ChannelOption.SO_BACKLOG, 500) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.SO_SNDBUF, INBOUND_CHANNEL_SEND_BUFFER_SIZE) + .childHandler(initializer); + + if (receiveBufferSize > 0) + bootstrap.childOption(ChannelOption.SO_RCVBUF, receiveBufferSize); + + ChannelFuture channelFuture = bootstrap.bind(localAddr); + + if (!channelFuture.awaitUninterruptibly().isSuccess()) + { + if (channelFuture.channel().isOpen()) + channelFuture.channel().close(); + + Throwable failedChannelCause = channelFuture.cause(); + + String causeString = ""; + if (failedChannelCause != null && failedChannelCause.getMessage() != null) + causeString = failedChannelCause.getMessage(); + + if (causeString.contains("in use")) + { + throw new ConfigurationException(localAddr + " is in use by another process. Change listen_address:storage_port " + + "in cassandra.yaml to values that do not conflict with other services"); + } + // looking at the jdk source, solaris/windows bind failue messages both use the phrase "cannot assign requested address". + // windows message uses "Cannot" (with a capital 'C'), and solaris (a/k/a *nux) doe not. hence we search for "annot" <sigh> + else if (causeString.contains("annot assign requested address")) + { + throw new ConfigurationException("Unable to bind to address " + localAddr + + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2"); + } + else + { + throw new ConfigurationException("failed to bind to: " + localAddr, failedChannelCause); + } + } + + return channelFuture.channel(); + } + + public static class InboundInitializer extends ChannelInitializer<SocketChannel> + { + private final IInternodeAuthenticator authenticator; + private final ServerEncryptionOptions encryptionOptions; + private final ChannelGroup channelGroup; + + public InboundInitializer(IInternodeAuthenticator authenticator, ServerEncryptionOptions encryptionOptions, ChannelGroup channelGroup) + { + this.authenticator = authenticator; + this.encryptionOptions = encryptionOptions; + this.channelGroup = channelGroup; + } + + @Override + public void initChannel(SocketChannel channel) throws Exception + { + channelGroup.add(channel); + ChannelPipeline pipeline = channel.pipeline(); + + // order of handlers: ssl -> logger -> handshakeHandler + if (encryptionOptions != null) + { + SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true); + SslHandler sslHandler = sslContext.newHandler(channel.alloc()); + logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); + pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler); } + + if (WIRETRACE) + pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO)); + + channel.pipeline().addLast(HANDSHAKE_HANDLER_NAME, new InboundHandshakeHandler(authenticator)); + } + } + + private String encryptionLogStatement(ServerEncryptionOptions options) + { + if (options == null) + return "disabled"; + + String encryptionType = OpenSsl.isAvailable() ? "openssl" : "jdk"; + return "enabled (" + encryptionType + ')'; + } + + /** + * Create the {@link Bootstrap} for connecting to a remote peer. This method does <b>not</b> attempt to connect to the peer, + * and thus does not block. + */ + public Bootstrap createOutboundBootstrap(OutboundConnectionParams params) + { + logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}", params.connectionId.connectionAddress(), + params.compress, encryptionLogStatement(params.encryptionOptions), + params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED); + Class<? extends Channel> transport = useEpoll ? EpollSocketChannel.class : NioSocketChannel.class; + Bootstrap bootstrap = new Bootstrap().group(outboundGroup) + .channel(transport) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_SNDBUF, params.sendBufferSize) + .option(ChannelOption.SO_RCVBUF, OUTBOUND_CHANNEL_RECEIVE_BUFFER_SIZE) + .option(ChannelOption.TCP_NODELAY, params.tcpNoDelay) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, params.waterMark) + .handler(new OutboundInitializer(params)); + bootstrap.localAddress(params.connectionId.local(), 0); + bootstrap.remoteAddress(params.connectionId.connectionAddress()); + return bootstrap; + } + + public static class OutboundInitializer extends ChannelInitializer<SocketChannel> + { + private final OutboundConnectionParams params; + + OutboundInitializer(OutboundConnectionParams params) + { + this.params = params; + } + + public void initChannel(SocketChannel channel) throws Exception + { + ChannelPipeline pipeline = channel.pipeline(); + + // order of handlers: ssl -> logger -> handshakeHandler + if (params.encryptionOptions != null) + { + SslContext sslContext = SSLFactory.getSslContext(params.encryptionOptions, true, false); + + final SslHandler sslHandler; + if (params.encryptionOptions.require_endpoint_verification) + { + InetSocketAddress peer = params.connectionId.remoteAddress(); + sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort()); + SSLEngine engine = sslHandler.engine(); + SSLParameters sslParameters = engine.getSSLParameters(); + sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); + engine.setSSLParameters(sslParameters); + } + else + { + sslHandler = sslContext.newHandler(channel.alloc()); + } + + logger.trace("creating outbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); + pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler); + } + + if (NettyFactory.WIRETRACE) + pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO)); + + pipeline.addLast(HANDSHAKE_HANDLER_NAME, new OutboundHandshakeHandler(params)); + } + } + + public void close() + { + acceptGroup.shutdownGracefully(); + outboundGroup.shutdownGracefully(); + inboundGroup.shutdownGracefully(); + } + + static Lz4FrameEncoder createLz4Encoder(int protocolVersion) + { + return new Lz4FrameEncoder(LZ4Factory.fastestInstance(), false, COMPRESSION_BLOCK_SIZE, checksumForFrameEncoders(protocolVersion)); + } + + private static Checksum checksumForFrameEncoders(int protocolVersion) + { + if (protocolVersion >= MessagingService.current_version) + return ChecksumType.CRC32.newInstance(); + return XXHashFactory.fastestInstance().newStreamingHash32(LZ4_HASH_SEED).asChecksum(); + } + + static Lz4FrameDecoder createLz4Decoder(int protocolVersion) + { + return new Lz4FrameDecoder(LZ4Factory.fastestInstance(), checksumForFrameEncoders(protocolVersion)); + } + + public static EventExecutor executorForChannelGroups() + { + return new DefaultEventExecutor(); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java new file mode 100644 index 0000000..24dc5ff --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.net.InetAddress; +import java.net.InetSocketAddress; + +/** + * Identifies an outbound messaging connection. + * + * This mainly hold the remote address and the type (small/large messages or gossip) of connection used, but with the + * additional detail that in some case (typically public EC2 address across regions) the address to which we connect + * to the remote is different from the address by which the node is known by the rest of the C*. + */ +public class OutboundConnectionIdentifier +{ + enum ConnectionType + { + GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE + } + + /** + * Memoization of the local node's broadcast address. + */ + private final InetSocketAddress localAddr; + + /** + * The address by which the remote is identified. This may be different from {@link #remoteConnectionAddr} for + * something like EC2 public IP address which need to be used for communication between EC2 regions. + */ + private final InetSocketAddress remoteAddr; + + /** + * The address to which we're connecting to the node (often the same as {@link #remoteAddr} but not always). + */ + private final InetSocketAddress remoteConnectionAddr; + + private final ConnectionType connectionType; + + private OutboundConnectionIdentifier(InetSocketAddress localAddr, + InetSocketAddress remoteAddr, + InetSocketAddress remoteConnectionAddr, + ConnectionType connectionType) + { + this.localAddr = localAddr; + this.remoteAddr = remoteAddr; + this.remoteConnectionAddr = remoteConnectionAddr; + this.connectionType = connectionType; + } + + private OutboundConnectionIdentifier(InetSocketAddress localAddr, + InetSocketAddress remoteAddr, + ConnectionType connectionType) + { + this(localAddr, remoteAddr, remoteAddr, connectionType); + } + + /** + * Creates an identifier for a small message connection and using the remote "identifying" address as its connection + * address. + */ + public static OutboundConnectionIdentifier small(InetSocketAddress localAddr, InetSocketAddress remoteAddr) + { + return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.SMALL_MESSAGE); + } + + /** + * Creates an identifier for a large message connection and using the remote "identifying" address as its connection + * address. + */ + public static OutboundConnectionIdentifier large(InetSocketAddress localAddr, InetSocketAddress remoteAddr) + { + return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.LARGE_MESSAGE); + } + + /** + * Creates an identifier for a gossip connection and using the remote "identifying" address as its connection + * address. + */ + public static OutboundConnectionIdentifier gossip(InetSocketAddress localAddr, InetSocketAddress remoteAddr) + { + return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.GOSSIP); + } + + /** + * Returns a newly created connection identifier to the same remote that this identifier, but using the provided + * address as connection address. + * + * @param remoteConnectionAddr the address to use for connection to the remote in the new identifier. + * @return a newly created connection identifier that differs from this one only by using {@code remoteConnectionAddr} + * as connection address to the remote. + */ + OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress remoteConnectionAddr) + { + return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType); + } + + /** + * The local node address. + */ + InetAddress local() + { + return localAddr.getAddress(); + } + + /** + * The remote node identifying address (the one to use for anything else than connecting to the node). + */ + InetSocketAddress remoteAddress() + { + return remoteAddr; + } + + /** + * The remote node identifying address (the one to use for anything else than connecting to the node). + */ + InetAddress remote() + { + return remoteAddr.getAddress(); + } + + /** + * The remote node connection address (the one to use to actually connect to the remote, and only that). + */ + InetSocketAddress connectionAddress() + { + return remoteConnectionAddr; + } + + /** + * The type of this connection. + */ + ConnectionType type() + { + return connectionType; + } + + @Override + public String toString() + { + return remoteAddr.equals(remoteConnectionAddr) + ? String.format("%s (%s)", remoteAddr, connectionType) + : String.format("%s on %s (%s)", remoteAddr, remoteConnectionAddr, connectionType); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java new file mode 100644 index 0000000..282480e --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionParams.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.google.common.base.Preconditions; + +import io.netty.channel.WriteBufferWaterMark; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult; +import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy; + +/** + * A collection of data points to be passed around for outbound connections. + */ +public class OutboundConnectionParams +{ + public static final int DEFAULT_SEND_BUFFER_SIZE = 1 << 16; + + final OutboundConnectionIdentifier connectionId; + final Consumer<HandshakeResult> callback; + final ServerEncryptionOptions encryptionOptions; + final NettyFactory.Mode mode; + final boolean compress; + final Optional<CoalescingStrategy> coalescingStrategy; + final int sendBufferSize; + final boolean tcpNoDelay; + final Supplier<QueuedMessage> backlogSupplier; + final Consumer<MessageResult> messageResultConsumer; + final WriteBufferWaterMark waterMark; + final int protocolVersion; + + private OutboundConnectionParams(OutboundConnectionIdentifier connectionId, + Consumer<HandshakeResult> callback, + ServerEncryptionOptions encryptionOptions, + NettyFactory.Mode mode, + boolean compress, + Optional<CoalescingStrategy> coalescingStrategy, + int sendBufferSize, + boolean tcpNoDelay, + Supplier<QueuedMessage> backlogSupplier, + Consumer<MessageResult> messageResultConsumer, + WriteBufferWaterMark waterMark, + int protocolVersion) + { + this.connectionId = connectionId; + this.callback = callback; + this.encryptionOptions = encryptionOptions; + this.mode = mode; + this.compress = compress; + this.coalescingStrategy = coalescingStrategy; + this.sendBufferSize = sendBufferSize; + this.tcpNoDelay = tcpNoDelay; + this.backlogSupplier = backlogSupplier; + this.messageResultConsumer = messageResultConsumer; + this.waterMark = waterMark; + this.protocolVersion = protocolVersion; + } + + public static Builder builder() + { + return new Builder(); + } + + public static Builder builder(OutboundConnectionParams params) + { + return new Builder(params); + } + + public static class Builder + { + private OutboundConnectionIdentifier connectionId; + private Consumer<HandshakeResult> callback; + private ServerEncryptionOptions encryptionOptions; + private NettyFactory.Mode mode; + private boolean compress; + private Optional<CoalescingStrategy> coalescingStrategy = Optional.empty(); + private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; + private boolean tcpNoDelay; + private Supplier<QueuedMessage> backlogSupplier; + private Consumer<MessageResult> messageResultConsumer; + private WriteBufferWaterMark waterMark = WriteBufferWaterMark.DEFAULT; + int protocolVersion; + + private Builder() + { } + + private Builder(OutboundConnectionParams params) + { + this.connectionId = params.connectionId; + this.callback = params.callback; + this.encryptionOptions = params.encryptionOptions; + this.mode = params.mode; + this.compress = params.compress; + this.coalescingStrategy = params.coalescingStrategy; + this.sendBufferSize = params.sendBufferSize; + this.tcpNoDelay = params.tcpNoDelay; + this.backlogSupplier = params.backlogSupplier; + this.messageResultConsumer = params.messageResultConsumer; + } + + public Builder connectionId(OutboundConnectionIdentifier connectionId) + { + this.connectionId = connectionId; + return this; + } + + public Builder callback(Consumer<HandshakeResult> callback) + { + this.callback = callback; + return this; + } + + public Builder encryptionOptions(ServerEncryptionOptions encryptionOptions) + { + this.encryptionOptions = encryptionOptions; + return this; + } + + public Builder mode(NettyFactory.Mode mode) + { + this.mode = mode; + return this; + } + + public Builder compress(boolean compress) + { + this.compress = compress; + return this; + } + + public Builder coalescingStrategy(Optional<CoalescingStrategy> coalescingStrategy) + { + this.coalescingStrategy = coalescingStrategy; + return this; + } + + public Builder sendBufferSize(int sendBufferSize) + { + this.sendBufferSize = sendBufferSize; + return this; + } + + public Builder tcpNoDelay(boolean tcpNoDelay) + { + this.tcpNoDelay = tcpNoDelay; + return this; + } + + public Builder backlogSupplier(Supplier<QueuedMessage> backlogSupplier) + { + this.backlogSupplier = backlogSupplier; + return this; + } + + public Builder messageResultConsumer(Consumer<MessageResult> messageResultConsumer) + { + this.messageResultConsumer = messageResultConsumer; + return this; + } + + public Builder waterMark(WriteBufferWaterMark waterMark) + { + this.waterMark = waterMark; + return this; + } + + public Builder protocolVersion(int protocolVersion) + { + this.protocolVersion = protocolVersion; + return this; + } + + public OutboundConnectionParams build() + { + Preconditions.checkArgument(protocolVersion > 0, "illegal protocol version: " + protocolVersion); + Preconditions.checkArgument(sendBufferSize > 0 && sendBufferSize < 1 << 20, "illegal send buffer size: " + sendBufferSize); + + return new OutboundConnectionParams(connectionId, callback, encryptionOptions, mode, compress, coalescingStrategy, sendBufferSize, + tcpNoDelay, backlogSupplier, messageResultConsumer, waterMark, protocolVersion); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java new file mode 100644 index 0000000..703549a --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandler; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage; +import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage; +import org.apache.cassandra.net.async.HandshakeProtocol.ThirdHandshakeMessage; + +import static org.apache.cassandra.config.Config.PROPERTY_PREFIX; + +/** + * A {@link ChannelHandler} to execute the send-side of the internode communication handshake protocol. + * As soon as the handler is added to the channel via {@link #channelActive(ChannelHandlerContext)} + * (which is only invoked if the underlying TCP connection was properly established), the {@link FirstHandshakeMessage} + * of the internode messaging protocol is automatically sent out. See {@link HandshakeProtocol} for full details + * about the internode messaging hndshake protocol. + * <p> + * Upon completion of the handshake (on success or fail), the {@link #callback} is invoked to let the listener + * know the result of the handshake. See {@link HandshakeResult} for details about the different result states. + * <p> + * This class extends {@link ByteToMessageDecoder}, which is a {@link ChannelInboundHandler}, because this handler + * waits for the peer's handshake response (the {@link SecondHandshakeMessage} of the internode messaging handshake protocol). + */ +public class OutboundHandshakeHandler extends ByteToMessageDecoder +{ + private static final Logger logger = LoggerFactory.getLogger(OutboundHandshakeHandler.class); + + /** + * The number of milliseconds to wait before closing a channel if there has been no progress (when there is + * data to be sent).See {@link IdleStateHandler} and {@link MessageOutHandler#userEventTriggered(ChannelHandlerContext, Object)}. + */ + private static final long DEFAULT_WRITE_IDLE_MS = TimeUnit.SECONDS.toMillis(10); + private static final String WRITE_IDLE_PROPERTY = PROPERTY_PREFIX + "outbound_write_idle_ms"; + private static final long WRITE_IDLE_MS = Long.getLong(WRITE_IDLE_PROPERTY, DEFAULT_WRITE_IDLE_MS); + + private final OutboundConnectionIdentifier connectionId; + + /** + * The expected messaging service version to use. + */ + private final int messagingVersion; + + /** + * A function to invoke upon completion of the attempt, success or failure, to connect to the peer. + */ + private final Consumer<HandshakeResult> callback; + private final NettyFactory.Mode mode; + private final OutboundConnectionParams params; + + OutboundHandshakeHandler(OutboundConnectionParams params) + { + this.params = params; + this.connectionId = params.connectionId; + this.messagingVersion = params.protocolVersion; + this.callback = params.callback; + this.mode = params.mode; + } + + /** + * {@inheritDoc} + * + * Invoked when the channel is made active, and sends out the {@link FirstHandshakeMessage} + */ + @Override + public void channelActive(final ChannelHandlerContext ctx) throws Exception + { + FirstHandshakeMessage msg = new FirstHandshakeMessage(messagingVersion, mode, params.compress); + logger.trace("starting handshake with peer {}, msg = {}", connectionId.connectionAddress(), msg); + ctx.writeAndFlush(msg.encode(ctx.alloc())).addListener(future -> firstHandshakeMessageListener(future, ctx)); + ctx.fireChannelActive(); + } + + /** + * A simple listener to make sure we could send the {@link FirstHandshakeMessage} to the socket, + * and fail the handshake attempt if we could not (for example, maybe we could create the TCP socket, but then + * the connection gets closed for some reason). + */ + void firstHandshakeMessageListener(Future<? super Void> future, ChannelHandlerContext ctx) + { + if (future.isSuccess()) + return; + + ChannelFuture channelFuture = (ChannelFuture)future; + exceptionCaught(ctx, channelFuture.cause()); + } + + /** + * {@inheritDoc} + * + * Invoked when we get the response back from the peer, which should contain the second message of the internode messaging handshake. + * <p> + * If the peer's protocol version does not equal what we were expecting, immediately close the channel (and socket); + * do *not* send out the third message of the internode messaging handshake. + * We will reconnect on the appropriate protocol version. + */ + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception + { + SecondHandshakeMessage msg = SecondHandshakeMessage.maybeDecode(in); + if (msg == null) + return; + + logger.trace("received second handshake message from peer {}, msg = {}", connectionId.connectionAddress(), msg); + final int peerMessagingVersion = msg.messagingVersion; + + // we expected a higher protocol version, but it was actually lower + if (messagingVersion > peerMessagingVersion) + { + logger.trace("peer's max version is {}; will reconnect with that version", peerMessagingVersion); + try + { + if (DatabaseDescriptor.getSeeds().contains(connectionId.remote())) + logger.warn("Seed gossip version is {}; will not connect with that version", peerMessagingVersion); + } + catch (Throwable e) + { + // If invalid yaml has been added to the config since startup, getSeeds() will throw an AssertionError + // Additionally, third party seed providers may throw exceptions if network is flakey. + // Regardless of what's thrown, we must catch it, disconnect, and try again + logger.warn("failed to reread yaml (on trying to connect to a seed): {}", e.getLocalizedMessage()); + } + ctx.close(); + callback.accept(HandshakeResult.disconnect(peerMessagingVersion)); + return; + } + // we anticipate a version that is lower than what peer is actually running + else if (messagingVersion < peerMessagingVersion && messagingVersion < MessagingService.current_version) + { + logger.trace("peer has a higher max version than expected {} (previous value {})", peerMessagingVersion, messagingVersion); + ctx.close(); + callback.accept(HandshakeResult.disconnect(peerMessagingVersion)); + return; + } + + try + { + ctx.writeAndFlush(new ThirdHandshakeMessage(MessagingService.current_version, connectionId.local()).encode(ctx.alloc())); + ChannelWriter channelWriter = setupPipeline(ctx.channel(), peerMessagingVersion); + callback.accept(HandshakeResult.success(channelWriter, peerMessagingVersion)); + } + catch (Exception e) + { + logger.info("failed to finalize internode messaging handshake", e); + ctx.close(); + callback.accept(HandshakeResult.failed()); + } + } + + @VisibleForTesting + ChannelWriter setupPipeline(Channel channel, int messagingVersion) + { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("idleWriteHandler", new IdleStateHandler(true, 0, WRITE_IDLE_MS, 0, TimeUnit.MILLISECONDS)); + if (params.compress) + pipeline.addLast(NettyFactory.OUTBOUND_COMPRESSOR_HANDLER_NAME, NettyFactory.createLz4Encoder(messagingVersion)); + + ChannelWriter channelWriter = ChannelWriter.create(channel, params.messageResultConsumer, params.coalescingStrategy); + pipeline.addLast("messageOutHandler", new MessageOutHandler(connectionId, messagingVersion, channelWriter, params.backlogSupplier)); + pipeline.remove(this); + return channelWriter; + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + { + logger.error("Failed to properly handshake with peer {}. Closing the channel.", connectionId, cause); + ctx.close(); + callback.accept(HandshakeResult.failed()); + } + + /** + * The result of the handshake. Handshake has 3 possible outcomes: + * 1) it can be successful, in which case the channel and version to used is returned in this result. + * 2) we may decide to disconnect to reconnect with another protocol version (namely, the version is passed in this result). + * 3) we can have a negotiation failure for an unknown reason. (#sadtrombone) + */ + public static class HandshakeResult + { + static final int UNKNOWN_PROTOCOL_VERSION = -1; + + /** + * Describes the result of receiving the response back from the peer (Message 2 of the handshake) + * and implies an action that should be taken. + */ + enum Outcome + { + SUCCESS, DISCONNECT, NEGOTIATION_FAILURE + } + + /** The channel for the connection, only set for successful handshake. */ + final ChannelWriter channelWriter; + /** The version negotiated with the peer. Set unless this is a {@link Outcome#NEGOTIATION_FAILURE}. */ + final int negotiatedMessagingVersion; + /** The handshake {@link Outcome}. */ + final Outcome outcome; + + private HandshakeResult(ChannelWriter channelWriter, int negotiatedMessagingVersion, Outcome outcome) + { + this.channelWriter = channelWriter; + this.negotiatedMessagingVersion = negotiatedMessagingVersion; + this.outcome = outcome; + } + + static HandshakeResult success(ChannelWriter channel, int negotiatedMessagingVersion) + { + return new HandshakeResult(channel, negotiatedMessagingVersion, Outcome.SUCCESS); + } + + static HandshakeResult disconnect(int negotiatedMessagingVersion) + { + return new HandshakeResult(null, negotiatedMessagingVersion, Outcome.DISCONNECT); + } + + static HandshakeResult failed() + { + return new HandshakeResult(null, UNKNOWN_PROTOCOL_VERSION, Outcome.NEGOTIATION_FAILURE); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java new file mode 100644 index 0000000..6bda9cd --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingConnection.java @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import org.apache.cassandra.auth.IInternodeAuthenticator; +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.async.NettyFactory.Mode; +import org.apache.cassandra.net.async.OutboundHandshakeHandler.HandshakeResult; +import org.apache.cassandra.utils.CoalescingStrategies; +import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.NoSpamLogger; + +/** + * Represents one connection to a peer, and handles the state transistions on the connection and the netty {@link Channel} + * The underlying socket is not opened until explicitly requested (by sending a message). + * + * The basic setup for the channel is like this: a message is requested to be sent via {@link #sendMessage(MessageOut, int)}. + * If the channel is not established, then we need to create it (obviously). To prevent multiple threads from creating + * independent connections, they attempt to update the {@link #state}; one thread will win the race and create the connection. + * Upon sucessfully setting up the connection/channel, the {@link #state} will be updated again (to {@link State#READY}, + * which indicates to other threads that the channel is ready for business and can be written to. + * + */ +public class OutboundMessagingConnection +{ + static final Logger logger = LoggerFactory.getLogger(OutboundMessagingConnection.class); + private static final NoSpamLogger errorLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS); + + private static final String INTRADC_TCP_NODELAY_PROPERTY = Config.PROPERTY_PREFIX + "otc_intradc_tcp_nodelay"; + + /** + * Enabled/disable TCP_NODELAY for intradc connections. Defaults to enabled. + */ + private static final boolean INTRADC_TCP_NODELAY = Boolean.parseBoolean(System.getProperty(INTRADC_TCP_NODELAY_PROPERTY, "true")); + + /** + * Number of milliseconds between connection createRetry attempts. + */ + private static final int OPEN_RETRY_DELAY_MS = 100; + + /** + * A minimum number of milliseconds to wait for a connection (TCP socket connect + handshake) + */ + private static final int MINIMUM_CONNECT_TIMEOUT_MS = 2000; + private final IInternodeAuthenticator authenticator; + + /** + * Describes this instance's ability to send messages into it's Netty {@link Channel}. + */ + enum State + { + /** waiting to create the connection */ + NOT_READY, + /** we've started to create the connection/channel */ + CREATING_CHANNEL, + /** channel is established and we can send messages */ + READY, + /** a dead state which should not be transitioned away from */ + CLOSED + } + + /** + * Backlog to hold messages passed by upstream threads while the Netty {@link Channel} is being set up or recreated. + */ + private final Queue<QueuedMessage> backlog; + + /** + * Reference to a {@link ScheduledExecutorService} rther than directly depending on something like {@link ScheduledExecutors}. + */ + private final ScheduledExecutorService scheduledExecutor; + + final AtomicLong droppedMessageCount; + final AtomicLong completedMessageCount; + + private volatile OutboundConnectionIdentifier connectionId; + + private final ServerEncryptionOptions encryptionOptions; + + /** + * A future for retrying connections. Bear in mind that this future does not execute in the + * netty event event loop, so there's some races to be careful of. + */ + private volatile ScheduledFuture<?> connectionRetryFuture; + + /** + * A future for notifying when the timeout for creating the connection and negotiating the handshake has elapsed. + * It will be cancelled when the channel is established correctly. Bear in mind that this future does not execute in the + * netty event event loop, so there's some races to be careful of. + */ + private volatile ScheduledFuture<?> connectionTimeoutFuture; + + private final AtomicReference<State> state; + + private final Optional<CoalescingStrategy> coalescingStrategy; + + /** + * A running count of the number of times we've tried to create a connection. + */ + private volatile int connectAttemptCount; + + /** + * The netty channel, once a socket connection is established; it won't be in it's normal working state until the handshake is complete. + */ + private volatile ChannelWriter channelWriter; + + /** + * the target protocol version to communicate to the peer with, discovered/negotiated via handshaking + */ + private int targetVersion; + + OutboundMessagingConnection(OutboundConnectionIdentifier connectionId, + ServerEncryptionOptions encryptionOptions, + Optional<CoalescingStrategy> coalescingStrategy, + IInternodeAuthenticator authenticator) + { + this(connectionId, encryptionOptions, coalescingStrategy, authenticator, ScheduledExecutors.scheduledFastTasks); + } + + @VisibleForTesting + OutboundMessagingConnection(OutboundConnectionIdentifier connectionId, + ServerEncryptionOptions encryptionOptions, + Optional<CoalescingStrategy> coalescingStrategy, + IInternodeAuthenticator authenticator, + ScheduledExecutorService sceduledExecutor) + { + this.connectionId = connectionId; + this.encryptionOptions = encryptionOptions; + this.authenticator = authenticator; + backlog = new ConcurrentLinkedQueue<>(); + droppedMessageCount = new AtomicLong(0); + completedMessageCount = new AtomicLong(0); + state = new AtomicReference<>(State.NOT_READY); + this.scheduledExecutor = sceduledExecutor; + this.coalescingStrategy = coalescingStrategy; + + // We want to use the most precise protocol version we know because while there is version detection on connect(), + // the target version might be accessed by the pool (in getConnection()) before we actually connect (as we + // only connect when the first message is submitted). Note however that the only case where we'll connect + // without knowing the true version of a node is if that node is a seed (otherwise, we can't know a node + // unless it has been gossiped to us or it has connected to us, and in both cases that will set the version). + // In that case we won't rely on that targetVersion before we're actually connected and so the version + // detection in connect() will do its job. + targetVersion = MessagingService.instance().getVersion(connectionId.remote()); + } + + /** + * If the connection is set up and ready to use (the normal case), simply send the message to it and return. + * Otherwise, one lucky thread is selected to create the Channel, while other threads just add the {@code msg} to + * the backlog queue. + * + * @return true if the message was accepted by the {@link #channelWriter}; else false if it was not accepted + * and added to the backlog or the channel is {@link State#CLOSED}. See documentation in {@link ChannelWriter} and + * {@link MessageOutHandler} how the backlogged messages get consumed. + */ + boolean sendMessage(MessageOut msg, int id) + { + return sendMessage(new QueuedMessage(msg, id)); + } + + boolean sendMessage(QueuedMessage queuedMessage) + { + State state = this.state.get(); + if (state == State.READY) + { + if (channelWriter.write(queuedMessage, false)) + return true; + + backlog.add(queuedMessage); + return false; + } + else if (state == State.CLOSED) + { + errorLogger.warn("trying to write message to a closed connection"); + return false; + } + else + { + backlog.add(queuedMessage); + connect(); + return true; + } + } + + /** + * Initiate all the actions required to establish a working, valid connection. This includes + * opening the socket, negotiating the internode messaging handshake, and setting up the working + * Netty {@link Channel}. However, this method will not block for all those actions: it will only + * kick off the connection attempt as everything is asynchronous. + * <p> + * Threads compete to update the {@link #state} field to {@link State#CREATING_CHANNEL} to ensure only one + * connection is attempted at a time. + * + * @return true if kicking off the connection attempt was started by this thread; else, false. + */ + public boolean connect() + { + // try to be the winning thread to create the channel + if (!state.compareAndSet(State.NOT_READY, State.CREATING_CHANNEL)) + return false; + + // clean up any lingering connection attempts + if (connectionTimeoutFuture != null) + { + connectionTimeoutFuture.cancel(false); + connectionTimeoutFuture = null; + } + + return tryConnect(); + } + + private boolean tryConnect() + { + if (state.get() != State.CREATING_CHANNEL) + return false; + + logger.debug("connection attempt {} to {}", connectAttemptCount, connectionId); + + + InetSocketAddress remote = connectionId.remoteAddress(); + if (!authenticator.authenticate(remote.getAddress(), remote.getPort())) + { + logger.warn("Internode auth failed connecting to {}", connectionId); + //Remove the connection pool and other thread so messages aren't queued + MessagingService.instance().destroyConnectionPool(remote.getAddress()); + + // don't update the state field as destroyConnectionPool() *should* call OMC.close() + // on all the connections in the OMP for the remoteAddress + return false; + } + + boolean compress = shouldCompressConnection(connectionId.local(), connectionId.remote()); + Bootstrap bootstrap = buildBootstrap(compress); + + ChannelFuture connectFuture = bootstrap.connect(); + connectFuture.addListener(this::connectCallback); + + long timeout = Math.max(MINIMUM_CONNECT_TIMEOUT_MS, DatabaseDescriptor.getRpcTimeout()); + if (connectionTimeoutFuture == null || connectionTimeoutFuture.isDone()) + connectionTimeoutFuture = scheduledExecutor.schedule(() -> connectionTimeout(connectFuture), timeout, TimeUnit.MILLISECONDS); + return true; + } + + @VisibleForTesting + static boolean shouldCompressConnection(InetAddress localHost, InetAddress remoteHost) + { + return (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all) + || ((DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc) && !isLocalDC(localHost, remoteHost)); + } + + private Bootstrap buildBootstrap(boolean compress) + { + boolean tcpNoDelay = isLocalDC(connectionId.local(), connectionId.remote()) ? INTRADC_TCP_NODELAY : DatabaseDescriptor.getInterDCTcpNoDelay(); + int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0 + ? DatabaseDescriptor.getInternodeSendBufferSize() + : OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE; + OutboundConnectionParams params = OutboundConnectionParams.builder() + .connectionId(connectionId) + .callback(this::finishHandshake) + .encryptionOptions(encryptionOptions) + .mode(Mode.MESSAGING) + .compress(compress) + .coalescingStrategy(coalescingStrategy) + .sendBufferSize(sendBufferSize) + .tcpNoDelay(tcpNoDelay) + .backlogSupplier(() -> nextBackloggedMessage()) + .messageResultConsumer(this::handleMessageResult) + .protocolVersion(targetVersion) + .build(); + + return NettyFactory.instance.createOutboundBootstrap(params); + } + + private QueuedMessage nextBackloggedMessage() + { + QueuedMessage msg = backlog.poll(); + if (msg == null) + return null; + + if (!msg.isTimedOut()) + return msg; + + if (msg.shouldRetry()) + return msg.createRetry(); + + droppedMessageCount.incrementAndGet(); + return null; + } + + static boolean isLocalDC(InetAddress localHost, InetAddress remoteHost) + { + String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(remoteHost); + String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(localHost); + return remoteDC != null && remoteDC.equals(localDC); + } + + /** + * Handles the callback of the TCP connection attempt (not including the handshake negotiation!), and really all + * we're handling here is the TCP connection failures. On failure, we close the channel (which should disconnect + * the socket, if connected). If there was an {@link IOException} while trying to connect, the connection will be + * retried after a short delay. + * <p> + * This method does not alter the {@link #state} as it's only evaluating the TCP connect, not TCP connect and handshake. + * Thus, {@link #finishHandshake(HandshakeResult)} will handle any necessary state updates. + * <p> + * Note: this method is called from the event loop, so be careful wrt thread visibility + * + * @return true iff the TCP connection was established and the {@link #state} is not {@link State#CLOSED}; else false. + */ + @VisibleForTesting + boolean connectCallback(Future<? super Void> future) + { + ChannelFuture channelFuture = (ChannelFuture)future; + + // make sure this instance is not (terminally) closed + if (state.get() == State.CLOSED) + { + channelFuture.channel().close(); + return false; + } + + // this is the success state + final Throwable cause = future.cause(); + if (cause == null) + { + connectAttemptCount = 0; + return true; + } + + setStateIfNotClosed(state, State.NOT_READY); + if (cause instanceof IOException) + { + logger.trace("unable to connect on attempt {} to {}", connectAttemptCount, connectionId, cause); + connectAttemptCount++; + connectionRetryFuture = scheduledExecutor.schedule(this::connect, OPEN_RETRY_DELAY_MS * connectAttemptCount, TimeUnit.MILLISECONDS); + } + else + { + JVMStabilityInspector.inspectThrowable(cause); + logger.error("non-IO error attempting to connect to {}", connectionId, cause); + } + return false; + } + + /** + * A callback for handling timeouts when creating a connection/negotiating the handshake. + * <p> + * Note: this method is *not* invoked from the netty event loop, + * so there's an inherent race with {@link #finishHandshake(HandshakeResult)}, + * as well as any possible connect() reattempts (a seemingly remote race condition, however). + * Therefore, this function tries to lose any races, as much as possible. + * + * @return true if there was a timeout on the connect/handshake; else false. + */ + boolean connectionTimeout(ChannelFuture channelFuture) + { + if (connectionRetryFuture != null) + { + connectionRetryFuture.cancel(false); + connectionRetryFuture = null; + } + connectAttemptCount = 0; + State initialState = state.get(); + if (initialState == State.CLOSED) + return true; + + if (initialState != State.READY) + { + logger.debug("timed out while trying to connect to {}", connectionId); + + channelFuture.channel().close(); + // a last-ditch attempt to let finishHandshake() win the race + if (state.compareAndSet(initialState, State.NOT_READY)) + { + backlog.clear(); + return true; + } + } + return false; + } + + /** + * Process the results of the handshake negotiation. + * <p> + * Note: this method will be invoked from the netty event loop, + * so there's an inherent race with {@link #connectionTimeout(ChannelFuture)}. + */ + void finishHandshake(HandshakeResult result) + { + // clean up the connector instances before changing the state + if (connectionTimeoutFuture != null) + { + connectionTimeoutFuture.cancel(false); + connectionTimeoutFuture = null; + } + if (connectionRetryFuture != null) + { + connectionRetryFuture.cancel(false); + connectionRetryFuture = null; + } + connectAttemptCount = 0; + + if (result.negotiatedMessagingVersion != HandshakeResult.UNKNOWN_PROTOCOL_VERSION) + { + targetVersion = result.negotiatedMessagingVersion; + MessagingService.instance().setVersion(connectionId.remote(), targetVersion); + } + + switch (result.outcome) + { + case SUCCESS: + assert result.channelWriter != null; + logger.debug("successfully connected to {}, conmpress = {}, coalescing = {}", connectionId, + shouldCompressConnection(connectionId.local(), connectionId.remote()), + coalescingStrategy.isPresent() ? coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED); + if (state.get() == State.CLOSED) + { + result.channelWriter.close(); + backlog.clear(); + break; + } + channelWriter = result.channelWriter; + // drain the backlog to the channel + channelWriter.writeBacklog(backlog, true); + // change the state so newly incoming messages can be sent to the channel (without adding to the backlog) + setStateIfNotClosed(state, State.READY); + // ship out any stragglers that got added to the backlog + channelWriter.writeBacklog(backlog, true); + break; + case DISCONNECT: + reconnect(); + break; + case NEGOTIATION_FAILURE: + setStateIfNotClosed(state, State.NOT_READY); + backlog.clear(); + break; + default: + throw new IllegalArgumentException("unhandled result type: " + result.outcome); + } + } + + @VisibleForTesting + static boolean setStateIfNotClosed(AtomicReference<State> state, State newState) + { + State s = state.get(); + if (s == State.CLOSED) + return false; + state.set(newState); + return true; + } + + int getTargetVersion() + { + return targetVersion; + } + + /** + * Handles the result of each message sent. + * + * Note: this function is expected to be invoked on the netty event loop. Also, do not retain any state from + * the input {@code messageResult}. + */ + void handleMessageResult(MessageResult messageResult) + { + completedMessageCount.incrementAndGet(); + + // checking the cause() is an optimized way to tell if the operation was successful (as the cause will be null) + // Note that ExpiredException is just a marker for timeout-ed message we're dropping, but as we already + // incremented the dropped message count in MessageOutHandler, we have nothing to do. + Throwable cause = messageResult.future.cause(); + if (cause == null) + return; + + if (cause instanceof ExpiredException) + { + droppedMessageCount.incrementAndGet(); + return; + } + + JVMStabilityInspector.inspectThrowable(cause); + + if (cause instanceof IOException || cause.getCause() instanceof IOException) + { + ChannelWriter writer = messageResult.writer; + if (writer.shouldPurgeBacklog()) + purgeBacklog(); + + // This writer needs to be closed and we need to trigger a reconnection. We really only want to do that + // once for this channel however (and again, no race because we're on the netty event loop). + if (!writer.isClosed() && messageResult.allowReconnect) + { + reconnect(); + writer.close(); + } + + QueuedMessage msg = messageResult.msg; + if (msg != null && msg.shouldRetry()) + { + sendMessage(msg.createRetry()); + } + } + else if (messageResult.future.isCancelled()) + { + // Someone cancelled the future, which we assume meant it doesn't want the message to be sent if it hasn't + // yet. Just ignore. + } + else + { + // Non IO exceptions are likely a programming error so let's not silence them + logger.error("Unexpected error writing on " + connectionId, cause); + } + } + + /** + * Change the IP address on which we connect to the peer. We will attempt to connect to the new address if there + * was a previous connection, and new incoming messages as well as existing {@link #backlog} messages will be sent there. + * Any outstanding messages in the existing channel will still be sent to the previous address (we won't/can't move them from + * one channel to another). + */ + void reconnectWithNewIp(InetSocketAddress newAddr) + { + State currentState = state.get(); + + // if we're closed, ignore the request + if (currentState == State.CLOSED) + return; + + // capture a reference to the current channel, in case it gets swapped out before we can call close() on it + ChannelWriter currentChannel = channelWriter; + connectionId = connectionId.withNewConnectionAddress(newAddr); + + if (currentState != State.NOT_READY) + reconnect(); + + // lastly, push through anything remaining in the existing channel. + if (currentChannel != null) + currentChannel.softClose(); + } + + /** + * Sets the state properly so {@link #connect()} can attempt to reconnect. + */ + void reconnect() + { + if (setStateIfNotClosed(state, State.NOT_READY)) + connect(); + } + + void purgeBacklog() + { + backlog.clear(); + } + + public void close(boolean softClose) + { + state.set(State.CLOSED); + + if (connectionTimeoutFuture != null) + { + connectionTimeoutFuture.cancel(false); + connectionTimeoutFuture = null; + } + + // drain the backlog + if (channelWriter != null) + { + if (softClose) + { + channelWriter.writeBacklog(backlog, false); + channelWriter.softClose(); + } + else + { + backlog.clear(); + channelWriter.close(); + } + + channelWriter = null; + } + } + + @Override + public String toString() + { + return connectionId.toString(); + } + + public Integer getPendingMessages() + { + int pending = backlog.size(); + ChannelWriter chan = channelWriter; + if (chan != null) + pending += (int)chan.pendingMessageCount(); + return pending; + } + + public Long getCompletedMessages() + { + return completedMessageCount.get(); + } + + public Long getDroppedMessages() + { + return droppedMessageCount.get(); + } + + /* + methods specific to testing follow + */ + + @VisibleForTesting + int backlogSize() + { + return backlog.size(); + } + + @VisibleForTesting + void addToBacklog(QueuedMessage msg) + { + backlog.add(msg); + } + + @VisibleForTesting + void setChannelWriter(ChannelWriter channelWriter) + { + this.channelWriter = channelWriter; + } + + @VisibleForTesting + ChannelWriter getChannelWriter() + { + return channelWriter; + } + + @VisibleForTesting + void setState(State state) + { + this.state.set(state); + } + + @VisibleForTesting + State getState() + { + return state.get(); + } + + @VisibleForTesting + void setTargetVersion(int targetVersion) + { + this.targetVersion = targetVersion; + } + + @VisibleForTesting + OutboundConnectionIdentifier getConnectionId() + { + return connectionId; + } + + @VisibleForTesting + void setConnectionTimeoutFuture(ScheduledFuture<?> connectionTimeoutFuture) + { + this.connectionTimeoutFuture = connectionTimeoutFuture; + } + + @VisibleForTesting + ScheduledFuture<?> getConnectionTimeoutFuture() + { + return connectionTimeoutFuture; + } + + public boolean isConnected() + { + return state.get() == State.READY; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java new file mode 100644 index 0000000..0086da8 --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/OutboundMessagingPool.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.net.InetSocketAddress; +import java.util.Optional; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.auth.IInternodeAuthenticator; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.metrics.ConnectionMetrics; +import org.apache.cassandra.net.BackPressureState; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType; +import org.apache.cassandra.utils.CoalescingStrategies; +import org.apache.cassandra.utils.CoalescingStrategies.CoalescingStrategy; + +/** + * Groups a set of outbound connections to a given peer, and routes outgoing messages to the appropriate connection + * (based upon message's type or size). Contains a {@link OutboundMessagingConnection} for each of the + * {@link ConnectionType} type. + */ +public class OutboundMessagingPool +{ + @VisibleForTesting + static final long LARGE_MESSAGE_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "otcp_large_message_threshold", 1024 * 64); + + private final ConnectionMetrics metrics; + private final BackPressureState backPressureState; + + public OutboundMessagingConnection gossipChannel; + public OutboundMessagingConnection largeMessageChannel; + public OutboundMessagingConnection smallMessageChannel; + + /** + * An override address on which to communicate with the peer. Typically used for something like EC2 public IP addresses + * which need to be used for communication between EC2 regions. + */ + private InetSocketAddress preferredRemoteAddr; + + public OutboundMessagingPool(InetSocketAddress remoteAddr, InetSocketAddress localAddr, ServerEncryptionOptions encryptionOptions, + BackPressureState backPressureState, IInternodeAuthenticator authenticator) + { + preferredRemoteAddr = remoteAddr; + this.backPressureState = backPressureState; + metrics = new ConnectionMetrics(localAddr.getAddress(), this); + + + smallMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.small(localAddr, preferredRemoteAddr), + encryptionOptions, coalescingStrategy(remoteAddr), authenticator); + largeMessageChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.large(localAddr, preferredRemoteAddr), + encryptionOptions, coalescingStrategy(remoteAddr), authenticator); + + // don't attempt coalesce the gossip messages, just ship them out asap (let's not anger the FD on any peer node by any artificial delays) + gossipChannel = new OutboundMessagingConnection(OutboundConnectionIdentifier.gossip(localAddr, preferredRemoteAddr), + encryptionOptions, Optional.empty(), authenticator); + } + + private static Optional<CoalescingStrategy> coalescingStrategy(InetSocketAddress remoteAddr) + { + String strategyName = DatabaseDescriptor.getOtcCoalescingStrategy(); + String displayName = remoteAddr.getAddress().getHostAddress(); + return CoalescingStrategies.newCoalescingStrategy(strategyName, + DatabaseDescriptor.getOtcCoalescingWindow(), + OutboundMessagingConnection.logger, + displayName); + + } + + public BackPressureState getBackPressureState() + { + return backPressureState; + } + + public void sendMessage(MessageOut msg, int id) + { + getConnection(msg).sendMessage(msg, id); + } + + @VisibleForTesting + public OutboundMessagingConnection getConnection(MessageOut msg) + { + // optimize for the common path (the small message channel) + if (Stage.GOSSIP != msg.getStage()) + { + return msg.serializedSize(smallMessageChannel.getTargetVersion()) < LARGE_MESSAGE_THRESHOLD + ? smallMessageChannel + : largeMessageChannel; + } + return gossipChannel; + } + + /** + * Reconnect to the peer using the given {@code addr}. Outstanding messages in each channel will be sent on the + * current channel. Typically this function is used for something like EC2 public IP addresses which need to be used + * for communication between EC2 regions. + * + * @param addr IP Address to use (and prefer) going forward for connecting to the peer + */ + public void reconnectWithNewIp(InetSocketAddress addr) + { + preferredRemoteAddr = addr; + gossipChannel.reconnectWithNewIp(addr); + largeMessageChannel.reconnectWithNewIp(addr); + smallMessageChannel.reconnectWithNewIp(addr); + } + + /** + * Close each netty channel and it's socket. + * + * @param softClose {@code true} if existing messages in the queue should be sent before closing. + */ + public void close(boolean softClose) + { + gossipChannel.close(softClose); + largeMessageChannel.close(softClose); + smallMessageChannel.close(softClose); + } + + /** + * For testing purposes only. + */ + @VisibleForTesting + OutboundMessagingConnection getConnection(ConnectionType connectionType) + { + switch (connectionType) + { + case GOSSIP: + return gossipChannel; + case LARGE_MESSAGE: + return largeMessageChannel; + case SMALL_MESSAGE: + return smallMessageChannel; + default: + throw new IllegalArgumentException("unsupported connection type: " + connectionType); + } + } + + public void incrementTimeout() + { + metrics.timeouts.mark(); + } + + public long getTimeouts() + { + return metrics.timeouts.getCount(); + } + + public InetSocketAddress getPreferredRemoteAddr() + { + return preferredRemoteAddr; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/356dc3c2/src/java/org/apache/cassandra/net/async/QueuedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/QueuedMessage.java b/src/java/org/apache/cassandra/net/async/QueuedMessage.java new file mode 100644 index 0000000..28e4ba4 --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/QueuedMessage.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.CoalescingStrategies; + +/** + * A wrapper for outbound messages. All messages will be retried once. + */ +public class QueuedMessage implements CoalescingStrategies.Coalescable +{ + public final MessageOut<?> message; + public final int id; + public final long timestampNanos; + public final boolean droppable; + private final boolean retryable; + + public QueuedMessage(MessageOut<?> message, int id) + { + this(message, id, System.nanoTime(), MessagingService.DROPPABLE_VERBS.contains(message.verb), true); + } + + @VisibleForTesting + public QueuedMessage(MessageOut<?> message, int id, long timestampNanos, boolean droppable, boolean retryable) + { + this.message = message; + this.id = id; + this.timestampNanos = timestampNanos; + this.droppable = droppable; + this.retryable = retryable; + } + + /** don't drop a non-droppable message just because it's timestamp is expired */ + public boolean isTimedOut() + { + return droppable && timestampNanos < System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(message.getTimeout()); + } + + public boolean shouldRetry() + { + return retryable; + } + + public QueuedMessage createRetry() + { + return new QueuedMessage(message, id, System.nanoTime(), droppable, false); + } + + public long timestampNanos() + { + return timestampNanos; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org