So I put some code back in to trigger a read() on the channel only if the 
number of "in flight" packets is below the threshold. I also made sure to 
set the channel autoread to false. However, it continues to read 
continuously, regardless of this. This seems to be as a result of this code 
path:

Thread [pool-2-thread-1] (Suspended (breakpoint at line 111 in 
AbstractOioChannel)) 
Main$ChunkedFileChannel(AbstractOioChannel).setReadPending(boolean) line: 
111 
Main$ChunkedFileChannel(AbstractOioChannel).doBeginRead() line: 100 
FileChannel$FileChannelUnsafe(AbstractChannel$AbstractUnsafe).beginRead() 
line: 756 
DefaultChannelPipeline$HeadContext.read(ChannelHandlerContext) line: 1273 
ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 150 
DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 309 
AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeRead(ChannelHandlerContext)
 
line: 127 
DefaultChannelHandlerContext(AbstractChannelHandlerContext).read() line: 526 
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
 
line: 433 
TCPCodec(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext) 
line: 155 
ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext) 
line: 92 
DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext) 
line: 167 
AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext)
 
line: 91 
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
 
line: 412 
ChunkedFrameDecoder(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext)
 
line: 155 
ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext) 
line: 92 
DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext) 
line: 167 
AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext)
 
line: 91 
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelReadComplete()
 
line: 412 
DefaultChannelPipeline.fireChannelReadComplete() line: 962 
Main$ChunkedFileChannel(AbstractOioByteChannel).doRead() line: 153 
AbstractOioChannel$1.run() line: 44 
ThreadPerChannelEventLoop.run() line: 52 
SingleThreadEventExecutor$2.run() line: 116 
Thread.run() line: 745 

As expected my manual invocation of read() sets readPending to true, then 
triggers the AbstractOioChannel readTask, which does a read, sets 
readPending to false, and calls fireChannelReadComplete(). So far, so good.

However, I set a breakpoint on setReadPending, (resulting in this stack 
trace), and saw that read() was being invoked again, independently of my 
own code.

This appears to be as a result of:

DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
 
line: 433 

which calls read() again, as a result of this code:

        /**

         * At this point, we are sure the handler of this context did not 
produce anything and

         * we suppressed the {@code channelReadComplete()} event.

         *

         * If the next handler invoked {@link #read()} to read something 
but nothing was produced

         * by the handler of this context, someone has to issue another {@link 
#read()} operation

         * until the handler of this context produces something.

         *

         * Why? Because otherwise the next handler will not receive {@code 
channelRead()} nor

         * {@code channelReadComplete()} event at all for the {@link 
#read()} operation it issued.

         */

        if (invokedPrevRead && !channel().config().isAutoRead()) {

            /**

             * The next (or upstream) handler invoked {@link #read()}, but 
it didn't get any

             * {@code channelRead()} event. We should read once more, so 
that the handler of the current

             * context have a chance to produce something.

             */

            read();

        } else {

            invokedPrevRead = false;

        }


However, a message was produced, so I don't really understand what is going 
on here.


Rogan



On Thursday, March 2, 2017 at 4:35:10 PM UTC+2, Rogan Dawes wrote:
>
> Hi Norman,
>
> The code is available here:
>
>
> https://github.com/sensepost/USaBUSe/tree/master/HIDProxy/src/main/java/com/sensepost/hidproxy
>
> The important stuff is in Main, since there is not actually a lot going on.
>
> I have removed all the code that handles slowing the stream down, as I was 
> unable to figure out how to get it to work, but you can see the FileChannel 
> implementation, at least. I have had some interim success by overriding 
> OioByteStreamChannel.available(), and limiting it to 60 bytes max, so now I 
> need to reintroduce the limits on the number of packets in flight. I'm not 
> sure if this is the right way, and also not sure of the best way to do it 
> for a SocketChannel.
>
> I was actually thinking that using a custom ByteBufAllocator that only 
> creates ByteBuf's with a max capacity of 60 bytes would be another 
> approach, that would work for both types of channel. What do you think of 
> this idea?
>
> Rogan
>
>
> On Thu, Mar 2, 2017 at 3:19 PM 'Norman Maurer' via Netty discussions <
> [email protected] <javascript:>> wrote:
>
>> The solution would be to ensure the read() can only produce a limited 
>> number of bytes. I am not sure I understand how you implemented this kind 
>> of stuff tho.
>>
>>
>> On 2. Mar 2017, at 13:34, Rogan Dawes <[email protected] <javascript:>> 
>> wrote:
>>
>> Nobody got any hints for me on this?
>>
>> I tried spinning in the ChannelHandler, waiting for the ack count to 
>> increment, but that simply blocked handling of any incoming messages 
>> containing the incremented ack :-(
>>
>> Is there no way to defer handling of a message until a later point in 
>> time, triggered by performing a read(), for example?
>>
>> Rogan
>>
>>
>> On Tue, Feb 28, 2017 at 8:56 AM Rogan Dawes <[email protected] 
>> <javascript:>> wrote:
>>
>>> So I have one major problem that is getting in the way, and that is 
>>> managing flow control over the multiplexed link.
>>>
>>> I send a packet with a particular sequence number, and the receiver must 
>>> ACK it, much like TCP. Each packet may be a maximum of 60 bytes, and I can 
>>> have a maximum of 14 packets in flight at any one time (i.e. unacked), for 
>>> performance reasons. In theory, this should allow for retransmission of any 
>>> missing packets, but in reality, this results in flow control, which is 
>>> also desirable.
>>>
>>> However, the one flow control aspect I don't have a good handle on is 
>>> how to stop reading from the upstream channel. In one case, the Channel is 
>>> a file, and the ByteBuf that is read contains the entire contents, being 
>>> 10000 bytes. I have a "chunking handler" that chops this up into 60 byte 
>>> chunks, which is fine, however, the thread tries to write all 167 chunks 
>>> one after the other, without giving the receiver a chance to ack.
>>>
>>> How can I tell the pipeline to stop feeding me ByteBufs for the moment, 
>>> but to resume when I call read() on the channel? AutoRead is already false, 
>>> but that doesn't seem to help, because the very first read() results in far 
>>> too much data being available, and it "must" be processed!
>>>
>>> Thanks!
>>>
>>> Rogan
>>>
>>>
>>> On Monday, February 20, 2017 at 9:26:14 AM UTC+2, Rogan Dawes wrote:
>>>
>>>> Thanks for the response, it's good to know that I am at least on the 
>>>> right track.
>>>>
>>>> Rogan
>>>>
>>>>
>>>> On Wed, Feb 15, 2017 at 5:32 PM Leonardo Gomes <[email protected]> 
>>>> wrote:
>>>>
>>> Hi Rogan,
>>>>>
>>>>> That sounds like a good structure. You may want to check the Proxy 
>>>>> example as well: 
>>>>>
>>>>>
>>>>> https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy
>>>>>
>>>>> You may want to look into MessageToByteEncoder as well.
>>>>>
>>>>> I've written a similar thing, but with a pool of channels (using 
>>>>> Netty's pool) as I didn't have any requirement to write on a specific 
>>>>> outbound socket, but could take any from a pool of sockets I ended up 
>>>>> creating.
>>>>>
>>>>> Cheers,
>>>>> Leo.
>>>>>
>>>>>
>>>>> On Tue, Feb 14, 2017 at 4:58 PM, Rogan Dawes <[email protected]> 
>>>>> wrote:
>>>>>
>>>> Hi,
>>>>>>
>>>>>> I'm trying to use netty to implement a multiplexer/demultiplexer. 
>>>>>> Think multiple streams travelling together over a single socket, then 
>>>>>> getting spread out into individual sockets in my code.
>>>>>>
>>>>>> I'm basing it on the example socks proxy code, which seems to have 
>>>>>> the basics in place (server and client together). However, I'm trying to 
>>>>>> figure out how best to deal with a couple of requirements:
>>>>>>
>>>>>> The incoming traffic (call it downstream) has a stream number, which 
>>>>>> is used to identify which stream it belongs to, followed by some flags, 
>>>>>> sequence numbers, a length byte, and 60 bytes of data (of which "length" 
>>>>>> are usable). Depending on the flags, I will have to open a new client 
>>>>>> socket, and link it to the relevant stream number.
>>>>>>
>>>>>> It seems to me that the server pipeline will have a simple handler 
>>>>>> that essentially looks up the channel/pipeline associated with a 
>>>>>> particular 
>>>>>> stream, or, if it does not exist, creates it. Each "client pipeline" 
>>>>>> (call 
>>>>>> it upstream) will have a first handler that knows how to process the 
>>>>>> flags, 
>>>>>> verify the sequence numbers, and extract the actual data, then write 
>>>>>> that 
>>>>>> data into the pipeline. It seems like this could be implemented as a 
>>>>>> MessageToMessageCodec, right? Packet comes in, ByteBuf of actual data 
>>>>>> goes 
>>>>>> out, to be written to the stream.
>>>>>>
>>>>>> In the reverse direction, any data coming from upstream will have to 
>>>>>> be broken up into chunks of max 60 bytes, then put into a "Packet" that 
>>>>>> simply gets written to the downstream channel.
>>>>>>
>>>>>> Does this make sense? Does it seem like a reasonable way of 
>>>>>> structuring the pipelines?
>>>>>>
>>>>>> Thanks for your help.
>>>>>>
>>>>>> Rogan
>>>>>>
>>>>>>
>>>>>> -- 
>>>>>> You received this message because you are subscribed to the Google 
>>>>>> Groups "Netty discussions" group.
>>>>>>
>>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>>> an email to [email protected].
>>>>>
>>>>>
>>>>>> To view this discussion on the web visit 
>>>>>> https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com
>>>>>>  
>>>>>> <https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>>>> .
>>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>>
>>>>>
>>>>> -- 
>>>>> You received this message because you are subscribed to the Google 
>>>>> Groups "Netty discussions" group.
>>>>>
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>>> an email to [email protected].
>>>>
>>>>
>>>>> To view this discussion on the web visit 
>>>>> https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com
>>>>>  
>>>>> <https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>>>> .
>>>>> For more options, visit https://groups.google.com/d/optout.
>>>>>
>>>>
>>> -- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Netty discussions" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to [email protected] <javascript:>.
>>> To view this discussion on the web visit 
>>> https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com
>>>  
>>> <https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com?utm_medium=email&utm_source=footer>
>>> .
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Netty discussions" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>>
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com
>>  
>> <https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
>>
>> For more options, visit https://groups.google.com/d/optout.
>>
>> -- 
>> You received this message because you are subscribed to the Google Groups 
>> "Netty discussions" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to [email protected] <javascript:>.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com
>>  
>> <https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com?utm_medium=email&utm_source=footer>
>> .
>> For more options, visit https://groups.google.com/d/optout.
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to