Hi,

I was following the proxy example to understand the mechanism to deal with 
backpressure. The high-level idea is to set autoread as false, and call 
frontend ctx.channel.read() when needed. Generally the chain will be 
inboundChannel.read() -> outboundChannel.write() -> inboundChannel.read() 
-> outboundChannel.write() -> ...

However, I found one issue in the current example (specifically, in 
*HexDumpProxyFrontendHandler*):
Step 0: the frontend channel read some content, which triggers 
channelRead() to call outboundChannel.writeAndFlush();
Step 1: once the writeAndFlush() is successful, the registered listener 
will again call inboundChannel's ctx.channel.read();
Step 2: If for some reason the inboundChannel has nothing to read (maybe 
data just arrive too slowly), the inboundChannel's channelRead() will not 
be triggered. Also, since autoread is set to false, inboundChannel will not 
be read later. So, the chain will be broken.

If this analysis is correct, then there potentially could be some bug in 
the netty's proxy example.

Below is the code for the FrontendHandler.

Thanks,
Ming

public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {

    private final String remoteHost;
    private final int remotePort;

    private volatile Channel outboundChannel;

    public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        final Channel inboundChannel = ctx.channel();

        // Start the connection attempt.
        Bootstrap b = new Bootstrap();
        b.group(inboundChannel.eventLoop())
         .channel(ctx.channel().getClass())
         .handler(new HexDumpProxyBackendHandler(inboundChannel))
         .option(ChannelOption.AUTO_READ, false);
        ChannelFuture f = b.connect(remoteHost, remotePort);
        outboundChannel = f.channel();
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    // connection complete start to read first data
                    inboundChannel.read();
                } else {
                    // Close the connection if the connection attempt has 
failed.
                    inboundChannel.close();
                }
            }
        });
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) {
        if (outboundChannel.isActive()) {
            outboundChannel.writeAndFlush(msg).addListener(new 
ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    if (future.isSuccess()) {
                        // was able to flush out data, start to read the next 
chunk
                        ctx.channel().read();
                    } else {
                        future.channel().close();
                    }
                }
            });
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        if (outboundChannel != null) {
            closeOnFlush(outboundChannel);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        closeOnFlush(ctx.channel());
    }

    /**
     * Closes the specified channel after all queued write requests are flushed.
     */
    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/6f730bc6-3910-41ee-8277-a4eadd168b1a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to