You could just use the `FixedRecvByteBufAllocator` and set it to the max number of bytes you want to read.
> On 3. Mar 2017, at 15:49, Rogan Dawes <[email protected]> wrote: > > So I have achieved what I was after, to a certain extent, by overriding > channelReadComplete() in my TCPCodec, and only calling > super.channelReadComplete() if the number of in flight packets is below the > threshold. > > This is now working for the FileChannel class. This same approach does not > appear to work for an NioSocketChannel, though. > > And ... the reason for this is that my custom extension of FileChannel limits > reads to 60 bytes at a time by overriding available(), and I haven't figured > out how to do something similar for NioSocketChannel. So NioSocketChannel > reads e.g. 9000 bytes of available data from the socket, then passes that to > a ByteToMessageDecoder, which is obliged to process the whole amount of data, > resulting in 150 messages before my channelReadComplete() method is called. > > Sigh! Any suggestions on how to constrain how many bytes are read from the > socket at once? > > Regards, > > Rogan > > > On Fri, Mar 3, 2017 at 3:25 PM Rogan Dawes <[email protected] > <mailto:[email protected]>> wrote: > So I put some code back in to trigger a read() on the channel only if the > number of "in flight" packets is below the threshold. I also made sure to set > the channel autoread to false. However, it continues to read continuously, > regardless of this. This seems to be as a result of this code path: > > Thread [pool-2-thread-1] (Suspended (breakpoint at line 111 in > AbstractOioChannel)) > Main$ChunkedFileChannel(AbstractOioChannel).setReadPending(boolean) > line: 111 > Main$ChunkedFileChannel(AbstractOioChannel).doBeginRead() line: 100 > > FileChannel$FileChannelUnsafe(AbstractChannel$AbstractUnsafe).beginRead() > line: 756 > DefaultChannelPipeline$HeadContext.read(ChannelHandlerContext) line: > 1273 > ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: > 150 > DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: > 309 > > AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeRead(ChannelHandlerContext) > line: 127 > DefaultChannelHandlerContext(AbstractChannelHandlerContext).read() > line: 526 > > DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete() > line: 433 > > TCPCodec(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext) > line: 155 > > ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext) > line: 92 > > DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext) > line: 167 > > AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext) > line: 91 > > DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete() > line: 412 > > ChunkedFrameDecoder(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext) > line: 155 > > ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext) > line: 92 > > DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext) > line: 167 > > AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext) > line: 91 > > DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelReadComplete() > line: 412 > DefaultChannelPipeline.fireChannelReadComplete() line: 962 > Main$ChunkedFileChannel(AbstractOioByteChannel).doRead() line: 153 > AbstractOioChannel$1.run() line: 44 > ThreadPerChannelEventLoop.run() line: 52 > SingleThreadEventExecutor$2.run() line: 116 > Thread.run() line: 745 > > As expected my manual invocation of read() sets readPending to true, then > triggers the AbstractOioChannel readTask, which does a read, sets readPending > to false, and calls fireChannelReadComplete(). So far, so good. > > However, I set a breakpoint on setReadPending, (resulting in this stack > trace), and saw that read() was being invoked again, independently of my own > code. > > This appears to be as a result of: > > > DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete() > line: 433 > > which calls read() again, as a result of this code: > > /** > > * At this point, we are sure the handler of this context did not > produce anything and > > * we suppressed the {@code channelReadComplete()} event. > > * > > * If the next handler invoked {@link #read()} to read something but > nothing was produced > > * by the handler of this context, someone has to issue another > {@link #read()} operation > > * until the handler of this context produces something. > > * > > * Why? Because otherwise the next handler will not receive {@code > channelRead()} nor > > * {@code channelReadComplete()} event at all for the {@link #read()} > operation it issued. > > */ > > if (invokedPrevRead && !channel().config().isAutoRead()) { > > /** > > * The next (or upstream) handler invoked {@link #read()}, but it > didn't get any > > * {@code channelRead()} event. We should read once more, so that > the handler of the current > > * context have a chance to produce something. > > */ > > read(); > > } else { > > invokedPrevRead = false; > > } > > > > However, a message was produced, so I don't really understand what is going > on here. > > > > Rogan > > > > > On Thursday, March 2, 2017 at 4:35:10 PM UTC+2, Rogan Dawes wrote: > Hi Norman, > > The code is available here: > > https://github.com/sensepost/USaBUSe/tree/master/HIDProxy/src/main/java/com/sensepost/hidproxy > > <https://github.com/sensepost/USaBUSe/tree/master/HIDProxy/src/main/java/com/sensepost/hidproxy> > > The important stuff is in Main, since there is not actually a lot going on. > > I have removed all the code that handles slowing the stream down, as I was > unable to figure out how to get it to work, but you can see the FileChannel > implementation, at least. I have had some interim success by overriding > OioByteStreamChannel.available(), and limiting it to 60 bytes max, so now I > need to reintroduce the limits on the number of packets in flight. I'm not > sure if this is the right way, and also not sure of the best way to do it for > a SocketChannel. > > I was actually thinking that using a custom ByteBufAllocator that only > creates ByteBuf's with a max capacity of 60 bytes would be another approach, > that would work for both types of channel. What do you think of this idea? > > Rogan > > > On Thu, Mar 2, 2017 at 3:19 PM 'Norman Maurer' via Netty discussions > <[email protected] <>> wrote: > The solution would be to ensure the read() can only produce a limited number > of bytes. I am not sure I understand how you implemented this kind of stuff > tho. > > > >> On 2. Mar 2017, at 13:34, Rogan Dawes <[email protected] <>> wrote: >> > >> Nobody got any hints for me on this? >> >> I tried spinning in the ChannelHandler, waiting for the ack count to >> increment, but that simply blocked handling of any incoming messages >> containing the incremented ack :-( >> >> Is there no way to defer handling of a message until a later point in time, >> triggered by performing a read(), for example? >> >> Rogan >> >> > >> On Tue, Feb 28, 2017 at 8:56 AM Rogan Dawes <[email protected] <>> wrote: > >> So I have one major problem that is getting in the way, and that is managing >> flow control over the multiplexed link. >> >> I send a packet with a particular sequence number, and the receiver must ACK >> it, much like TCP. Each packet may be a maximum of 60 bytes, and I can have >> a maximum of 14 packets in flight at any one time (i.e. unacked), for >> performance reasons. In theory, this should allow for retransmission of any >> missing packets, but in reality, this results in flow control, which is also >> desirable. >> >> However, the one flow control aspect I don't have a good handle on is how to >> stop reading from the upstream channel. In one case, the Channel is a file, >> and the ByteBuf that is read contains the entire contents, being 10000 >> bytes. I have a "chunking handler" that chops this up into 60 byte chunks, >> which is fine, however, the thread tries to write all 167 chunks one after >> the other, without giving the receiver a chance to ack. >> >> How can I tell the pipeline to stop feeding me ByteBufs for the moment, but >> to resume when I call read() on the channel? AutoRead is already false, but >> that doesn't seem to help, because the very first read() results in far too >> much data being available, and it "must" be processed! >> >> Thanks! >> >> Rogan >> >> >> On Monday, February 20, 2017 at 9:26:14 AM UTC+2, Rogan Dawes wrote: >> Thanks for the response, it's good to know that I am at least on the right >> track. >> >> Rogan >> >> >> On Wed, Feb 15, 2017 at 5:32 PM Leonardo Gomes <[email protected] <>> >> wrote: >> Hi Rogan, >> >> That sounds like a good structure. You may want to check the Proxy example >> as well: >> >> https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy >> >> <https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy> >> >> You may want to look into MessageToByteEncoder as well. >> >> I've written a similar thing, but with a pool of channels (using Netty's >> pool) as I didn't have any requirement to write on a specific outbound >> socket, but could take any from a pool of sockets I ended up creating. >> >> Cheers, >> Leo. >> >> >> On Tue, Feb 14, 2017 at 4:58 PM, Rogan Dawes <[email protected] <>> wrote: >> Hi, >> >> I'm trying to use netty to implement a multiplexer/demultiplexer. Think >> multiple streams travelling together over a single socket, then getting >> spread out into individual sockets in my code. >> >> I'm basing it on the example socks proxy code, which seems to have the >> basics in place (server and client together). However, I'm trying to figure >> out how best to deal with a couple of requirements: >> >> The incoming traffic (call it downstream) has a stream number, which is used >> to identify which stream it belongs to, followed by some flags, sequence >> numbers, a length byte, and 60 bytes of data (of which "length" are usable). >> Depending on the flags, I will have to open a new client socket, and link it >> to the relevant stream number. >> >> It seems to me that the server pipeline will have a simple handler that >> essentially looks up the channel/pipeline associated with a particular >> stream, or, if it does not exist, creates it. Each "client pipeline" (call >> it upstream) will have a first handler that knows how to process the flags, >> verify the sequence numbers, and extract the actual data, then write that >> data into the pipeline. It seems like this could be implemented as a >> MessageToMessageCodec, right? Packet comes in, ByteBuf of actual data goes >> out, to be written to the stream. >> >> In the reverse direction, any data coming from upstream will have to be >> broken up into chunks of max 60 bytes, then put into a "Packet" that simply >> gets written to the downstream channel. >> >> Does this make sense? Does it seem like a reasonable way of structuring the >> pipelines? >> >> Thanks for your help. >> >> Rogan >> >> >> -- >> You received this message because you are subscribed to the Google Groups >> "Netty discussions" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <>. >> >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com >> >> <https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com?utm_medium=email&utm_source=footer>. >> For more options, visit https://groups.google.com/d/optout >> <https://groups.google.com/d/optout>. >> >> -- >> You received this message because you are subscribed to the Google Groups >> "Netty discussions" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <>. >> >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com >> >> <https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com?utm_medium=email&utm_source=footer>. >> For more options, visit https://groups.google.com/d/optout >> <https://groups.google.com/d/optout>. >> >> -- >> You received this message because you are subscribed to the Google Groups >> "Netty discussions" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <>. > >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com >> >> <https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com?utm_medium=email&utm_source=footer>. >> For more options, visit https://groups.google.com/d/optout >> <https://groups.google.com/d/optout>. > >> >> -- >> You received this message because you are subscribed to the Google Groups >> "Netty discussions" group. >> To unsubscribe from this group and stop receiving emails from it, send an >> email to [email protected] <>. > > >> To view this discussion on the web visit >> https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com >> >> <https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com?utm_medium=email&utm_source=footer>. > >> >> For more options, visit https://groups.google.com/d/optout >> <https://groups.google.com/d/optout>. > > > -- > You received this message because you are subscribed to the Google Groups > "Netty discussions" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] <>. > To view this discussion on the web visit > https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com > > <https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com?utm_medium=email&utm_source=footer>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. > > -- > You received this message because you are subscribed to the Google Groups > "Netty discussions" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To view this discussion on the web visit > https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com > > <https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com?utm_medium=email&utm_source=footer>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. > > -- > You received this message because you are subscribed to the Google Groups > "Netty discussions" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected] > <mailto:[email protected]>. > To view this discussion on the web visit > https://groups.google.com/d/msgid/netty/CAOYdKdhwd3h9fK5%3De-gLQNqN%3D9KLu3sVpWqwXD7TnX8d0%2BRmmQ%40mail.gmail.com > > <https://groups.google.com/d/msgid/netty/CAOYdKdhwd3h9fK5%3De-gLQNqN%3D9KLu3sVpWqwXD7TnX8d0%2BRmmQ%40mail.gmail.com?utm_medium=email&utm_source=footer>. > For more options, visit https://groups.google.com/d/optout > <https://groups.google.com/d/optout>. -- You received this message because you are subscribed to the Google Groups "Netty discussions" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To view this discussion on the web visit https://groups.google.com/d/msgid/netty/D46ED5B8-7EDD-43D3-9AB9-ACCE2EBBBF01%40googlemail.com. For more options, visit https://groups.google.com/d/optout.
