So I have achieved what I was after, to a certain extent, by overriding channelReadComplete() in my TCPCodec, and only calling super.channelReadComplete() if the number of in flight packets is below the threshold.
This is now working for the FileChannel class. This same approach does not appear to work for an NioSocketChannel, though. And ... the reason for this is that my custom extension of FileChannel limits reads to 60 bytes at a time by overriding available(), and I haven't figured out how to do something similar for NioSocketChannel. So NioSocketChannel reads e.g. 9000 bytes of available data from the socket, then passes that to a ByteToMessageDecoder, which is obliged to process the whole amount of data, resulting in 150 messages before my channelReadComplete() method is called. Sigh! Any suggestions on how to constrain how many bytes are read from the socket at once? Regards, Rogan On Fri, Mar 3, 2017 at 3:25 PM Rogan Dawes <[email protected]> wrote: So I put some code back in to trigger a read() on the channel only if the number of "in flight" packets is below the threshold. I also made sure to set the channel autoread to false. However, it continues to read continuously, regardless of this. This seems to be as a result of this code path: Thread [pool-2-thread-1] (Suspended (breakpoint at line 111 in AbstractOioChannel)) Main$ChunkedFileChannel(AbstractOioChannel).setReadPending(boolean) line: 111 Main$ChunkedFileChannel(AbstractOioChannel).doBeginRead() line: 100 FileChannel$FileChannelUnsafe(AbstractChannel$AbstractUnsafe).beginRead() line: 756 DefaultChannelPipeline$HeadContext.read(ChannelHandlerContext) line: 1273 ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 150 DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 309 AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeRead(ChannelHandlerContext) line: 127 DefaultChannelHandlerContext(AbstractChannelHandlerContext).read() line: 526 DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete() line: 433 TCPCodec(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext) line: 155 ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext) line: 92 DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext) line: 167 AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext) line: 91 DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete() line: 412 ChunkedFrameDecoder(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext) line: 155 ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext) line: 92 DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext) line: 167 AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext) line: 91 DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelReadComplete() line: 412 DefaultChannelPipeline.fireChannelReadComplete() line: 962 Main$ChunkedFileChannel(AbstractOioByteChannel).doRead() line: 153 AbstractOioChannel$1.run() line: 44 ThreadPerChannelEventLoop.run() line: 52 SingleThreadEventExecutor$2.run() line: 116 Thread.run() line: 745 As expected my manual invocation of read() sets readPending to true, then triggers the AbstractOioChannel readTask, which does a read, sets readPending to false, and calls fireChannelReadComplete(). So far, so good. However, I set a breakpoint on setReadPending, (resulting in this stack trace), and saw that read() was being invoked again, independently of my own code. This appears to be as a result of: DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete() line: 433 which calls read() again, as a result of this code: /** * At this point, we are sure the handler of this context did not produce anything and * we suppressed the {@code channelReadComplete()} event. * * If the next handler invoked {@link #read()} to read something but nothing was produced * by the handler of this context, someone has to issue another {@link #read()} operation * until the handler of this context produces something. * * Why? Because otherwise the next handler will not receive {@code channelRead()} nor * {@code channelReadComplete()} event at all for the {@link #read()} operation it issued. */ if (invokedPrevRead && !channel().config().isAutoRead()) { /** * The next (or upstream) handler invoked {@link #read()}, but it didn't get any * {@code channelRead()} event. We should read once more, so that the handler of the current * context have a chance to produce something. */ read(); } else { invokedPrevRead = false; } However, a message was produced, so I don't really understand what is going on here. Rogan On Thursday, March 2, 2017 at 4:35:10 PM UTC+2, Rogan Dawes wrote: Hi Norman, The code is available here: https://github.com/sensepost/USaBUSe/tree/master/HIDProxy/src/main/java/com/sensepost/hidproxy The important stuff is in Main, since there is not actually a lot going on. I have removed all the code that handles slowing the stream down, as I was unable to figure out how to get it to work, but you can see the FileChannel implementation, at least. I have had some interim success by overriding OioByteStreamChannel.available(), and limiting it to 60 bytes max, so now I need to reintroduce the limits on the number of packets in flight. I'm not sure if this is the right way, and also not sure of the best way to do it for a SocketChannel. I was actually thinking that using a custom ByteBufAllocator that only creates ByteBuf's with a max capacity of 60 bytes would be another approach, that would work for both types of channel. What do you think of this idea? Rogan On Thu, Mar 2, 2017 at 3:19 PM 'Norman Maurer' via Netty discussions < [email protected]> wrote: The solution would be to ensure the read() can only produce a limited number of bytes. I am not sure I understand how you implemented this kind of stuff tho. On 2. Mar 2017, at 13:34, Rogan Dawes <[email protected]> wrote: Nobody got any hints for me on this? I tried spinning in the ChannelHandler, waiting for the ack count to increment, but that simply blocked handling of any incoming messages containing the incremented ack :-( Is there no way to defer handling of a message until a later point in time, triggered by performing a read(), for example? Rogan On Tue, Feb 28, 2017 at 8:56 AM Rogan Dawes <[email protected]> wrote: So I have one major problem that is getting in the way, and that is managing flow control over the multiplexed link. I send a packet with a particular sequence number, and the receiver must ACK it, much like TCP. Each packet may be a maximum of 60 bytes, and I can have a maximum of 14 packets in flight at any one time (i.e. unacked), for performance reasons. In theory, this should allow for retransmission of any missing packets, but in reality, this results in flow control, which is also desirable. However, the one flow control aspect I don't have a good handle on is how to stop reading from the upstream channel. In one case, the Channel is a file, and the ByteBuf that is read contains the entire contents, being 10000 bytes. I have a "chunking handler" that chops this up into 60 byte chunks, which is fine, however, the thread tries to write all 167 chunks one after the other, without giving the receiver a chance to ack. How can I tell the pipeline to stop feeding me ByteBufs for the moment, but to resume when I call read() on the channel? AutoRead is already false, but that doesn't seem to help, because the very first read() results in far too much data being available, and it "must" be processed! Thanks! Rogan On Monday, February 20, 2017 at 9:26:14 AM UTC+2, Rogan Dawes wrote: Thanks for the response, it's good to know that I am at least on the right track. Rogan On Wed, Feb 15, 2017 at 5:32 PM Leonardo Gomes <[email protected]> wrote: Hi Rogan, That sounds like a good structure. You may want to check the Proxy example as well: https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy You may want to look into MessageToByteEncoder as well. I've written a similar thing, but with a pool of channels (using Netty's pool) as I didn't have any requirement to write on a specific outbound socket, but could take any from a pool of sockets I ended up creating. Cheers, Leo. On Tue, Feb 14, 2017 at 4:58 PM, Rogan Dawes <[email protected]> wrote: Hi, I'm trying to use netty to implement a multiplexer/demultiplexer. Think multiple streams travelling together over a single socket, then getting spread out into individual sockets in my code. I'm basing it on the example socks proxy code, which seems to have the basics in place (server and client together). However, I'm trying to figure out how best to deal with a couple of requirements: The incoming traffic (call it downstream) has a stream number, which is used to identify which stream it belongs to, followed by some flags, sequence numbers, a length byte, and 60 bytes of data (of which "length" are usable). Depending on the flags, I will have to open a new client socket, and link it to the relevant stream number. It seems to me that the server pipeline will have a simple handler that essentially looks up the channel/pipeline associated with a particular stream, or, if it does not exist, creates it. Each "client pipeline" (call it upstream) will have a first handler that knows how to process the flags, verify the sequence numbers, and extract the actual data, then write that data into the pipeline. It seems like this could be implemented as a MessageToMessageCodec, right? Packet comes in, ByteBuf of actual data goes out, to be written to the stream. In the reverse direction, any data coming from upstream will have to be broken up into chunks of max 60 bytes, then put into a "Packet" that simply gets written to the downstream channel. Does this make sense? Does it seem like a reasonable way of structuring the pipelines? Thanks for your help. Rogan -- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com <https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com?utm_medium=email&utm_source=footer> . For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com <https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com?utm_medium=email&utm_source=footer> . For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com <https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com?utm_medium=email&utm_source=footer> . For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com <https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com?utm_medium=email&utm_source=footer> . For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com <https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com?utm_medium=email&utm_source=footer> . For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com <https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com?utm_medium=email&utm_source=footer> . For more options, visit https://groups.google.com/d/optout. -- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/CAOYdKdhwd3h9fK5%3De-gLQNqN%3D9KLu3sVpWqwXD7TnX8d0%2BRmmQ%40mail.gmail.com. For more options, visit https://groups.google.com/d/optout.
