You could just use the `FixedRecvByteBufAllocator` and set it to the max number 
of bytes you want to read.


> On 3. Mar 2017, at 15:49, Rogan Dawes <[email protected]> wrote:
> 
> So I have achieved what I was after, to a certain extent, by overriding 
> channelReadComplete() in my TCPCodec, and only calling 
> super.channelReadComplete() if the number of in flight packets is below the 
> threshold.
> 
> This is now working for the FileChannel class. This same approach does not 
> appear to work for an NioSocketChannel, though. 
> 
> And ... the reason for this is that my custom extension of FileChannel limits 
> reads to 60 bytes at a time by overriding available(), and I haven't figured 
> out how to do something similar for NioSocketChannel. So NioSocketChannel 
> reads e.g. 9000 bytes of available data from the socket, then passes that to 
> a ByteToMessageDecoder, which is obliged to process the whole amount of data, 
> resulting in 150 messages before my channelReadComplete() method is called.
> 
> Sigh! Any suggestions on how to constrain how many bytes are read from the 
> socket at once?
> 
> Regards,
> 
> Rogan
> 
> 
> On Fri, Mar 3, 2017 at 3:25 PM Rogan Dawes <[email protected] 
> <mailto:[email protected]>> wrote:
> So I put some code back in to trigger a read() on the channel only if the 
> number of "in flight" packets is below the threshold. I also made sure to set 
> the channel autoread to false. However, it continues to read continuously, 
> regardless of this. This seems to be as a result of this code path:
> 
> Thread [pool-2-thread-1] (Suspended (breakpoint at line 111 in 
> AbstractOioChannel))   
>       Main$ChunkedFileChannel(AbstractOioChannel).setReadPending(boolean) 
> line: 111   
>       Main$ChunkedFileChannel(AbstractOioChannel).doBeginRead() line: 100     
>       
> FileChannel$FileChannelUnsafe(AbstractChannel$AbstractUnsafe).beginRead() 
> line: 756     
>       DefaultChannelPipeline$HeadContext.read(ChannelHandlerContext) line: 
> 1273       
>       ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 
> 150        
>       DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 
> 309        
>       
> AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeRead(ChannelHandlerContext)
>  line: 127      
>       DefaultChannelHandlerContext(AbstractChannelHandlerContext).read() 
> line: 526    
>       
> DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
>  line: 433 
>       
> TCPCodec(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext) 
> line: 155    
>       
> ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext) 
> line: 92  
>       
> DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext) 
> line: 167 
>       
> AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext)
>  line: 91        
>       
> DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
>  line: 412 
>       
> ChunkedFrameDecoder(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext)
>  line: 155 
>       
> ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext) 
> line: 92  
>       
> DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext) 
> line: 167 
>       
> AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext)
>  line: 91        
>       
> DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelReadComplete()
>  line: 412   
>       DefaultChannelPipeline.fireChannelReadComplete() line: 962      
>       Main$ChunkedFileChannel(AbstractOioByteChannel).doRead() line: 153      
>       AbstractOioChannel$1.run() line: 44     
>       ThreadPerChannelEventLoop.run() line: 52        
>       SingleThreadEventExecutor$2.run() line: 116     
>       Thread.run() line: 745  
> 
> As expected my manual invocation of read() sets readPending to true, then 
> triggers the AbstractOioChannel readTask, which does a read, sets readPending 
> to false, and calls fireChannelReadComplete(). So far, so good.
> 
> However, I set a breakpoint on setReadPending, (resulting in this stack 
> trace), and saw that read() was being invoked again, independently of my own 
> code.
> 
> This appears to be as a result of:
> 
>       
> DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
>  line: 433 
> 
> which calls read() again, as a result of this code:
> 
>         /**
> 
>          * At this point, we are sure the handler of this context did not 
> produce anything and
> 
>          * we suppressed the {@code channelReadComplete()} event.
> 
>          *
> 
>          * If the next handler invoked {@link #read()} to read something but 
> nothing was produced
> 
>          * by the handler of this context, someone has to issue another 
> {@link #read()} operation
> 
>          * until the handler of this context produces something.
> 
>          *
> 
>          * Why? Because otherwise the next handler will not receive {@code 
> channelRead()} nor
> 
>          * {@code channelReadComplete()} event at all for the {@link #read()} 
> operation it issued.
> 
>          */
> 
>         if (invokedPrevRead && !channel().config().isAutoRead()) {
> 
>             /**
> 
>              * The next (or upstream) handler invoked {@link #read()}, but it 
> didn't get any
> 
>              * {@code channelRead()} event. We should read once more, so that 
> the handler of the current
> 
>              * context have a chance to produce something.
> 
>              */
> 
>             read();
> 
>         } else {
> 
>             invokedPrevRead = false;
> 
>         }
> 
> 
> 
> However, a message was produced, so I don't really understand what is going 
> on here.
> 
> 
> 
> Rogan
> 
> 
> 
> 
> On Thursday, March 2, 2017 at 4:35:10 PM UTC+2, Rogan Dawes wrote:
> Hi Norman,
> 
> The code is available here:
> 
> https://github.com/sensepost/USaBUSe/tree/master/HIDProxy/src/main/java/com/sensepost/hidproxy
>  
> <https://github.com/sensepost/USaBUSe/tree/master/HIDProxy/src/main/java/com/sensepost/hidproxy>
> 
> The important stuff is in Main, since there is not actually a lot going on.
> 
> I have removed all the code that handles slowing the stream down, as I was 
> unable to figure out how to get it to work, but you can see the FileChannel 
> implementation, at least. I have had some interim success by overriding 
> OioByteStreamChannel.available(), and limiting it to 60 bytes max, so now I 
> need to reintroduce the limits on the number of packets in flight. I'm not 
> sure if this is the right way, and also not sure of the best way to do it for 
> a SocketChannel.
> 
> I was actually thinking that using a custom ByteBufAllocator that only 
> creates ByteBuf's with a max capacity of 60 bytes would be another approach, 
> that would work for both types of channel. What do you think of this idea?
> 
> Rogan
> 
> 
> On Thu, Mar 2, 2017 at 3:19 PM 'Norman Maurer' via Netty discussions 
> <[email protected] <>> wrote:
> The solution would be to ensure the read() can only produce a limited number 
> of bytes. I am not sure I understand how you implemented this kind of stuff 
> tho.
> 
> 
> 
>> On 2. Mar 2017, at 13:34, Rogan Dawes <[email protected] <>> wrote:
>> 
> 
>> Nobody got any hints for me on this?
>> 
>> I tried spinning in the ChannelHandler, waiting for the ack count to 
>> increment, but that simply blocked handling of any incoming messages 
>> containing the incremented ack :-(
>> 
>> Is there no way to defer handling of a message until a later point in time, 
>> triggered by performing a read(), for example?
>> 
>> Rogan
>> 
>> 
> 
>> On Tue, Feb 28, 2017 at 8:56 AM Rogan Dawes <[email protected] <>> wrote:
> 
>> So I have one major problem that is getting in the way, and that is managing 
>> flow control over the multiplexed link.
>> 
>> I send a packet with a particular sequence number, and the receiver must ACK 
>> it, much like TCP. Each packet may be a maximum of 60 bytes, and I can have 
>> a maximum of 14 packets in flight at any one time (i.e. unacked), for 
>> performance reasons. In theory, this should allow for retransmission of any 
>> missing packets, but in reality, this results in flow control, which is also 
>> desirable.
>> 
>> However, the one flow control aspect I don't have a good handle on is how to 
>> stop reading from the upstream channel. In one case, the Channel is a file, 
>> and the ByteBuf that is read contains the entire contents, being 10000 
>> bytes. I have a "chunking handler" that chops this up into 60 byte chunks, 
>> which is fine, however, the thread tries to write all 167 chunks one after 
>> the other, without giving the receiver a chance to ack.
>> 
>> How can I tell the pipeline to stop feeding me ByteBufs for the moment, but 
>> to resume when I call read() on the channel? AutoRead is already false, but 
>> that doesn't seem to help, because the very first read() results in far too 
>> much data being available, and it "must" be processed!
>> 
>> Thanks!
>> 
>> Rogan
>> 
>> 
>> On Monday, February 20, 2017 at 9:26:14 AM UTC+2, Rogan Dawes wrote:
>> Thanks for the response, it's good to know that I am at least on the right 
>> track.
>> 
>> Rogan
>> 
>> 
>> On Wed, Feb 15, 2017 at 5:32 PM Leonardo Gomes <[email protected] <>> 
>> wrote:
>> Hi Rogan,
>> 
>> That sounds like a good structure. You may want to check the Proxy example 
>> as well: 
>> 
>> https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy
>>  
>> <https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy>
>> 
>> You may want to look into MessageToByteEncoder as well.
>> 
>> I've written a similar thing, but with a pool of channels (using Netty's 
>> pool) as I didn't have any requirement to write on a specific outbound 
>> socket, but could take any from a pool of sockets I ended up creating.
>> 
>> Cheers,
>> Leo.
>> 
>> 
>> On Tue, Feb 14, 2017 at 4:58 PM, Rogan Dawes <[email protected] <>> wrote:
>> Hi,
>> 
>> I'm trying to use netty to implement a multiplexer/demultiplexer. Think 
>> multiple streams travelling together over a single socket, then getting 
>> spread out into individual sockets in my code.
>> 
>> I'm basing it on the example socks proxy code, which seems to have the 
>> basics in place (server and client together). However, I'm trying to figure 
>> out how best to deal with a couple of requirements:
>> 
>> The incoming traffic (call it downstream) has a stream number, which is used 
>> to identify which stream it belongs to, followed by some flags, sequence 
>> numbers, a length byte, and 60 bytes of data (of which "length" are usable). 
>> Depending on the flags, I will have to open a new client socket, and link it 
>> to the relevant stream number.
>> 
>> It seems to me that the server pipeline will have a simple handler that 
>> essentially looks up the channel/pipeline associated with a particular 
>> stream, or, if it does not exist, creates it. Each "client pipeline" (call 
>> it upstream) will have a first handler that knows how to process the flags, 
>> verify the sequence numbers, and extract the actual data, then write that 
>> data into the pipeline. It seems like this could be implemented as a 
>> MessageToMessageCodec, right? Packet comes in, ByteBuf of actual data goes 
>> out, to be written to the stream.
>> 
>> In the reverse direction, any data coming from upstream will have to be 
>> broken up into chunks of max 60 bytes, then put into a "Packet" that simply 
>> gets written to the downstream channel.
>> 
>> Does this make sense? Does it seem like a reasonable way of structuring the 
>> pipelines?
>> 
>> Thanks for your help.
>> 
>> Rogan
>> 
>> 
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Netty discussions" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <>.
>> 
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com?utm_medium=email&utm_source=footer>.
>> For more options, visit https://groups.google.com/d/optout 
>> <https://groups.google.com/d/optout>.
>> 
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Netty discussions" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <>.
>> 
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com
>>  
>> <https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com?utm_medium=email&utm_source=footer>.
>> For more options, visit https://groups.google.com/d/optout 
>> <https://groups.google.com/d/optout>.
>> 
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Netty discussions" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <>.
> 
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com?utm_medium=email&utm_source=footer>.
>> For more options, visit https://groups.google.com/d/optout 
>> <https://groups.google.com/d/optout>.
> 
>> 
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Netty discussions" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <>.
> 
> 
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com
>>  
>> <https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com?utm_medium=email&utm_source=footer>.
> 
>> 
>> For more options, visit https://groups.google.com/d/optout 
>> <https://groups.google.com/d/optout>.
> 
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected] <>.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com
>  
> <https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected] 
> <mailto:[email protected]>.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com
>  
> <https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.
> 
> -- 
> You received this message because you are subscribed to the Google Groups 
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected] 
> <mailto:[email protected]>.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/netty/CAOYdKdhwd3h9fK5%3De-gLQNqN%3D9KLu3sVpWqwXD7TnX8d0%2BRmmQ%40mail.gmail.com
>  
> <https://groups.google.com/d/msgid/netty/CAOYdKdhwd3h9fK5%3De-gLQNqN%3D9KLu3sVpWqwXD7TnX8d0%2BRmmQ%40mail.gmail.com?utm_medium=email&utm_source=footer>.
> For more options, visit https://groups.google.com/d/optout 
> <https://groups.google.com/d/optout>.

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/D46ED5B8-7EDD-43D3-9AB9-ACCE2EBBBF01%40googlemail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to