Github user jasobrown commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r216157096 --- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java --- @@ -18,143 +18,296 @@ package org.apache.cassandra.net.async; -import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; -import java.util.Collections; -import java.util.EnumMap; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import org.apache.cassandra.io.util.DataInputBuffer; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.ReferenceCountUtil; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.exceptions.UnknownTableException; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.ParameterType; -import org.apache.cassandra.utils.vint.VIntCoding; +import org.apache.cassandra.net.MessageIn.MessageInProcessor; /** * Parses incoming messages as per the 4.0 internode messaging protocol. */ -public class MessageInHandler extends BaseMessageInHandler +public class MessageInHandler extends ChannelInboundHandlerAdapter { public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class); - private MessageHeader messageHeader; + private final InetAddressAndPort peer; - MessageInHandler(InetAddressAndPort peer, int messagingVersion) + private final BufferHandler bufferHandler; + private volatile boolean closed; + + public MessageInHandler(InetAddressAndPort peer, MessageInProcessor messageProcessor, boolean handlesLargeMessages) + { + this.peer = peer; + + bufferHandler = handlesLargeMessages + ? new BlockingBufferHandler(messageProcessor) + : new NonblockingBufferHandler(messageProcessor); + } + + public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException + { + if (!closed) + { + bufferHandler.channelRead(ctx, (ByteBuf) msg); + } + else + { + ReferenceCountUtil.release(msg); + ctx.close(); + } + } + + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + { + if (cause instanceof EOFException) + logger.trace("eof reading from socket; closing", cause); + else if (cause instanceof UnknownTableException) + logger.warn("Got message from unknown table while reading from socket; closing", cause); + else if (cause instanceof IOException) + logger.trace("IOException reading from socket; closing", cause); + else + logger.warn("Unexpected exception caught in inbound channel pipeline from " + ctx.channel().remoteAddress(), cause); + + close(); + ctx.close(); + } + + public void channelInactive(ChannelHandlerContext ctx) + { + logger.trace("received channel closed message for peer {} on local addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress()); + close(); + ctx.fireChannelInactive(); + } + + void close() { - this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER); + closed = true; + bufferHandler.close(); } - public MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn, Integer> messageConsumer) + boolean isClosed() { - super(peer, messagingVersion, messageConsumer); + return closed; + } - assert messagingVersion >= MessagingService.VERSION_40 : String.format("wrong messaging version for this handler: got %d, but expect %d or higher", - messagingVersion, MessagingService.VERSION_40); - state = State.READ_FIRST_CHUNK; + /** + * An abstraction around how incoming buffers are handled: either in a non-blocking manner ({@link NonblockingBufferHandler}) + * or in a blocking manner ({@link BlockingBufferHandler}). + * + * The methods declared here will only be invoked on the netty event loop. + */ + interface BufferHandler + { + void channelRead(ChannelHandlerContext ctx, ByteBuf in) throws IOException; + + void close(); } /** - * For each new message coming in, builds up a {@link MessageHeader} instance incrementally. This method - * attempts to deserialize as much header information as it can out of the incoming {@link ByteBuf}, and - * maintains a trivial state machine to remember progress across invocations. + * Processes incoming buffers on the netty event loop, in a non-blocking manner. If buffers are not completely consumed, + * it is stashed in {@link #retainedInlineBuffer}, and the next incoming buffer is combined with it. */ - @SuppressWarnings("resource") - public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception + class NonblockingBufferHandler implements BufferHandler --- End diff -- done
--- --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org