Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216157096
  
    --- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java ---
    @@ -18,143 +18,296 @@
     
     package org.apache.cassandra.net.async;
     
    -import java.io.DataInputStream;
    +import java.io.EOFException;
     import java.io.IOException;
    -import java.util.Collections;
    -import java.util.EnumMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.function.BiConsumer;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.RejectedExecutionHandler;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
     
    -import com.google.common.primitives.Ints;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import io.netty.buffer.ByteBuf;
     import io.netty.channel.ChannelHandlerContext;
    -import org.apache.cassandra.io.util.DataInputBuffer;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +import io.netty.handler.codec.ByteToMessageDecoder;
    +import io.netty.util.ReferenceCountUtil;
    +import org.apache.cassandra.concurrent.NamedThreadFactory;
    +import org.apache.cassandra.exceptions.UnknownTableException;
     import org.apache.cassandra.locator.InetAddressAndPort;
    -import org.apache.cassandra.net.MessageIn;
    -import org.apache.cassandra.net.MessagingService;
    -import org.apache.cassandra.net.ParameterType;
    -import org.apache.cassandra.utils.vint.VIntCoding;
    +import org.apache.cassandra.net.MessageIn.MessageInProcessor;
     
     /**
      * Parses incoming messages as per the 4.0 internode messaging protocol.
      */
    -public class MessageInHandler extends BaseMessageInHandler
    +public class MessageInHandler extends ChannelInboundHandlerAdapter
     {
         public static final Logger logger = 
LoggerFactory.getLogger(MessageInHandler.class);
     
    -    private MessageHeader messageHeader;
    +    private final InetAddressAndPort peer;
     
    -    MessageInHandler(InetAddressAndPort peer, int messagingVersion)
    +    private final BufferHandler bufferHandler;
    +    private volatile boolean closed;
    +
    +    public MessageInHandler(InetAddressAndPort peer, MessageInProcessor 
messageProcessor, boolean handlesLargeMessages)
    +    {
    +        this.peer = peer;
    +
    +        bufferHandler = handlesLargeMessages
    +                        ? new BlockingBufferHandler(messageProcessor)
    +                        : new NonblockingBufferHandler(messageProcessor);
    +    }
    +
    +    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException
    +    {
    +        if (!closed)
    +        {
    +            bufferHandler.channelRead(ctx, (ByteBuf) msg);
    +        }
    +        else
    +        {
    +            ReferenceCountUtil.release(msg);
    +            ctx.close();
    +        }
    +    }
    +
    +    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    +    {
    +        if (cause instanceof EOFException)
    +            logger.trace("eof reading from socket; closing", cause);
    +        else if (cause instanceof UnknownTableException)
    +            logger.warn("Got message from unknown table while reading from 
socket; closing", cause);
    +        else if (cause instanceof IOException)
    +            logger.trace("IOException reading from socket; closing", 
cause);
    +        else
    +            logger.warn("Unexpected exception caught in inbound channel 
pipeline from " + ctx.channel().remoteAddress(), cause);
    +
    +        close();
    +        ctx.close();
    +    }
    +
    +    public void channelInactive(ChannelHandlerContext ctx)
    +    {
    +        logger.trace("received channel closed message for peer {} on local 
addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress());
    +        close();
    +        ctx.fireChannelInactive();
    +    }
    +
    +    void close()
         {
    -        this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
    +        closed = true;
    +        bufferHandler.close();
         }
     
    -    public MessageInHandler(InetAddressAndPort peer, int messagingVersion, 
BiConsumer<MessageIn, Integer> messageConsumer)
    +    boolean isClosed()
         {
    -        super(peer, messagingVersion, messageConsumer);
    +        return closed;
    +    }
     
    -        assert messagingVersion >= MessagingService.VERSION_40 : 
String.format("wrong messaging version for this handler: got %d, but expect %d 
or higher",
    -                                                                           
   messagingVersion, MessagingService.VERSION_40);
    -        state = State.READ_FIRST_CHUNK;
    +    /**
    +     * An abstraction around how incoming buffers are handled: either in a 
non-blocking manner ({@link NonblockingBufferHandler})
    +     * or in a blocking manner ({@link BlockingBufferHandler}).
    +     *
    +     * The methods declared here will only be invoked on the netty event 
loop.
    +     */
    +    interface BufferHandler
    +    {
    +        void channelRead(ChannelHandlerContext ctx, ByteBuf in) throws 
IOException;
    +
    +        void close();
         }
     
         /**
    -     * For each new message coming in, builds up a {@link MessageHeader} 
instance incrementally. This method
    -     * attempts to deserialize as much header information as it can out of 
the incoming {@link ByteBuf}, and
    -     * maintains a trivial state machine to remember progress across 
invocations.
    +     * Processes incoming buffers on the netty event loop, in a 
non-blocking manner. If buffers are not completely consumed,
    +     * it is stashed in {@link #retainedInlineBuffer}, and the next 
incoming buffer is combined with it.
          */
    -    @SuppressWarnings("resource")
    -    public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception
    +    class NonblockingBufferHandler implements BufferHandler
    --- End diff --
    
    done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to