Hi,
I was following the proxy example to understand the mechanism to deal with
backpressure. The high-level idea is to set autoread as false, and call
frontend ctx.channel.read() when needed. Generally the chain will be
inboundChannel.read() -> outboundChannel.write() -> inboundChannel.read()
-> outboundChannel.write() -> ...
However, I found one issue in the current example (specifically, in
*HexDumpProxyFrontendHandler*):
Step 0: the frontend channel read some content, which triggers
channelRead() to call outboundChannel.writeAndFlush();
Step 1: once the writeAndFlush() is successful, the registered listener
will again call inboundChannel's ctx.channel.read();
Step 2: If for some reason the inboundChannel has nothing to read (maybe
data just arrive too slowly), the inboundChannel's channelRead() will not
be triggered. Also, since autoread is set to false, inboundChannel will not
be read later. So, the chain will be broken.
If this analysis is correct, then there potentially could be some bug in
the netty's proxy example.
Below is the code for the FrontendHandler.
Thanks,
Ming
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private volatile Channel outboundChannel;
public HexDumpProxyFrontendHandler(String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new HexDumpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has
failed.
inboundChannel.close();
}
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new
ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next
chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
}
--
You received this message because you are subscribed to the Google Groups
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/netty/6f730bc6-3910-41ee-8277-a4eadd168b1a%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.