So I have achieved what I was after, to a certain extent, by overriding
channelReadComplete() in my TCPCodec, and only calling
super.channelReadComplete() if the number of in flight packets is below the
threshold.

This is now working for the FileChannel class. This same approach does not
appear to work for an NioSocketChannel, though.

And ... the reason for this is that my custom extension of FileChannel
limits reads to 60 bytes at a time by overriding available(), and I haven't
figured out how to do something similar for NioSocketChannel. So
NioSocketChannel reads e.g. 9000 bytes of available data from the socket,
then passes that to a ByteToMessageDecoder, which is obliged to process the
whole amount of data, resulting in 150 messages before my
channelReadComplete() method is called.

Sigh! Any suggestions on how to constrain how many bytes are read from the
socket at once?

Regards,

Rogan


On Fri, Mar 3, 2017 at 3:25 PM Rogan Dawes <[email protected]> wrote:

So I put some code back in to trigger a read() on the channel only if the
number of "in flight" packets is below the threshold. I also made sure to
set the channel autoread to false. However, it continues to read
continuously, regardless of this. This seems to be as a result of this code
path:

Thread [pool-2-thread-1] (Suspended (breakpoint at line 111 in
AbstractOioChannel))
Main$ChunkedFileChannel(AbstractOioChannel).setReadPending(boolean) line:
111
Main$ChunkedFileChannel(AbstractOioChannel).doBeginRead() line: 100
FileChannel$FileChannelUnsafe(AbstractChannel$AbstractUnsafe).beginRead()
line: 756
DefaultChannelPipeline$HeadContext.read(ChannelHandlerContext) line: 1273
ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 150
DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 309
AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeRead(ChannelHandlerContext)
line: 127
DefaultChannelHandlerContext(AbstractChannelHandlerContext).read() line: 526
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
line: 433
TCPCodec(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext)
line: 155
ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext)
line: 92
DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext)
line: 167
AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext)
line: 91
DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
line: 412
ChunkedFrameDecoder(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext)
line: 155
ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext)
line: 92
DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext)
line: 167
AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext)
line: 91
DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelReadComplete()
line: 412
DefaultChannelPipeline.fireChannelReadComplete() line: 962
Main$ChunkedFileChannel(AbstractOioByteChannel).doRead() line: 153
AbstractOioChannel$1.run() line: 44
ThreadPerChannelEventLoop.run() line: 52
SingleThreadEventExecutor$2.run() line: 116
Thread.run() line: 745

As expected my manual invocation of read() sets readPending to true, then
triggers the AbstractOioChannel readTask, which does a read, sets
readPending to false, and calls fireChannelReadComplete(). So far, so good.

However, I set a breakpoint on setReadPending, (resulting in this stack
trace), and saw that read() was being invoked again, independently of my
own code.

This appears to be as a result of:

DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
line: 433

which calls read() again, as a result of this code:

        /**

         * At this point, we are sure the handler of this context did not
produce anything and

         * we suppressed the {@code channelReadComplete()} event.

         *

         * If the next handler invoked {@link #read()} to read something
but nothing was produced

         * by the handler of this context, someone has to issue another {@link
#read()} operation

         * until the handler of this context produces something.

         *

         * Why? Because otherwise the next handler will not receive {@code
channelRead()} nor

         * {@code channelReadComplete()} event at all for the {@link
#read()} operation it issued.

         */

        if (invokedPrevRead && !channel().config().isAutoRead()) {

            /**

             * The next (or upstream) handler invoked {@link #read()}, but
it didn't get any

             * {@code channelRead()} event. We should read once more, so
that the handler of the current

             * context have a chance to produce something.

             */

            read();

        } else {

            invokedPrevRead = false;

        }


However, a message was produced, so I don't really understand what is going
on here.


Rogan



On Thursday, March 2, 2017 at 4:35:10 PM UTC+2, Rogan Dawes wrote:

Hi Norman,

The code is available here:

https://github.com/sensepost/USaBUSe/tree/master/HIDProxy/src/main/java/com/sensepost/hidproxy

The important stuff is in Main, since there is not actually a lot going on.

I have removed all the code that handles slowing the stream down, as I was
unable to figure out how to get it to work, but you can see the FileChannel
implementation, at least. I have had some interim success by overriding
OioByteStreamChannel.available(), and limiting it to 60 bytes max, so now I
need to reintroduce the limits on the number of packets in flight. I'm not
sure if this is the right way, and also not sure of the best way to do it
for a SocketChannel.

I was actually thinking that using a custom ByteBufAllocator that only
creates ByteBuf's with a max capacity of 60 bytes would be another
approach, that would work for both types of channel. What do you think of
this idea?

Rogan


On Thu, Mar 2, 2017 at 3:19 PM 'Norman Maurer' via Netty discussions <
[email protected]> wrote:

The solution would be to ensure the read() can only produce a limited
number of bytes. I am not sure I understand how you implemented this kind
of stuff tho.


On 2. Mar 2017, at 13:34, Rogan Dawes <[email protected]> wrote:

Nobody got any hints for me on this?

I tried spinning in the ChannelHandler, waiting for the ack count to
increment, but that simply blocked handling of any incoming messages
containing the incremented ack :-(

Is there no way to defer handling of a message until a later point in time,
triggered by performing a read(), for example?

Rogan


On Tue, Feb 28, 2017 at 8:56 AM Rogan Dawes <[email protected]> wrote:

So I have one major problem that is getting in the way, and that is
managing flow control over the multiplexed link.

I send a packet with a particular sequence number, and the receiver must
ACK it, much like TCP. Each packet may be a maximum of 60 bytes, and I can
have a maximum of 14 packets in flight at any one time (i.e. unacked), for
performance reasons. In theory, this should allow for retransmission of any
missing packets, but in reality, this results in flow control, which is
also desirable.

However, the one flow control aspect I don't have a good handle on is how
to stop reading from the upstream channel. In one case, the Channel is a
file, and the ByteBuf that is read contains the entire contents, being
10000 bytes. I have a "chunking handler" that chops this up into 60 byte
chunks, which is fine, however, the thread tries to write all 167 chunks
one after the other, without giving the receiver a chance to ack.

How can I tell the pipeline to stop feeding me ByteBufs for the moment, but
to resume when I call read() on the channel? AutoRead is already false, but
that doesn't seem to help, because the very first read() results in far too
much data being available, and it "must" be processed!

Thanks!

Rogan


On Monday, February 20, 2017 at 9:26:14 AM UTC+2, Rogan Dawes wrote:

Thanks for the response, it's good to know that I am at least on the right
track.

Rogan


On Wed, Feb 15, 2017 at 5:32 PM Leonardo Gomes <[email protected]>
wrote:

Hi Rogan,

That sounds like a good structure. You may want to check the Proxy example
as well:

https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy

You may want to look into MessageToByteEncoder as well.

I've written a similar thing, but with a pool of channels (using Netty's
pool) as I didn't have any requirement to write on a specific outbound
socket, but could take any from a pool of sockets I ended up creating.

Cheers,
Leo.


On Tue, Feb 14, 2017 at 4:58 PM, Rogan Dawes <[email protected]> wrote:

Hi,

I'm trying to use netty to implement a multiplexer/demultiplexer. Think
multiple streams travelling together over a single socket, then getting
spread out into individual sockets in my code.

I'm basing it on the example socks proxy code, which seems to have the
basics in place (server and client together). However, I'm trying to figure
out how best to deal with a couple of requirements:

The incoming traffic (call it downstream) has a stream number, which is
used to identify which stream it belongs to, followed by some flags,
sequence numbers, a length byte, and 60 bytes of data (of which "length"
are usable). Depending on the flags, I will have to open a new client
socket, and link it to the relevant stream number.

It seems to me that the server pipeline will have a simple handler that
essentially looks up the channel/pipeline associated with a particular
stream, or, if it does not exist, creates it. Each "client pipeline" (call
it upstream) will have a first handler that knows how to process the flags,
verify the sequence numbers, and extract the actual data, then write that
data into the pipeline. It seems like this could be implemented as a
MessageToMessageCodec, right? Packet comes in, ByteBuf of actual data goes
out, to be written to the stream.

In the reverse direction, any data coming from upstream will have to be
broken up into chunks of max 60 bytes, then put into a "Packet" that simply
gets written to the downstream channel.

Does this make sense? Does it seem like a reasonable way of structuring the
pipelines?

Thanks for your help.

Rogan


-- 
You received this message because you are subscribed to the Google Groups
"Netty discussions" group.

To unsubscribe from this group and stop receiving emails from it, send an
email to [email protected].


To view this discussion on the web visit
https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com
<https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com?utm_medium=email&utm_source=footer>
.
For more options, visit https://groups.google.com/d/optout.


-- 
You received this message because you are subscribed to the Google Groups
"Netty discussions" group.

To unsubscribe from this group and stop receiving emails from it, send an
email to [email protected].


To view this discussion on the web visit
https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com
<https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com?utm_medium=email&utm_source=footer>
.
For more options, visit https://groups.google.com/d/optout.


-- 
You received this message because you are subscribed to the Google Groups
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to [email protected].

To view this discussion on the web visit
https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com
<https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com?utm_medium=email&utm_source=footer>
.
For more options, visit https://groups.google.com/d/optout.


-- 
You received this message because you are subscribed to the Google Groups
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to [email protected].

To view this discussion on the web visit
https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com
<https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com?utm_medium=email&utm_source=footer>
.


For more options, visit https://groups.google.com/d/optout.

-- 
You received this message because you are subscribed to the Google Groups
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com
<https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com?utm_medium=email&utm_source=footer>
.
For more options, visit https://groups.google.com/d/optout.

-- 
You received this message because you are subscribed to the Google Groups
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com
<https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com?utm_medium=email&utm_source=footer>
.
For more options, visit https://groups.google.com/d/optout.

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/CAOYdKdhwd3h9fK5%3De-gLQNqN%3D9KLu3sVpWqwXD7TnX8d0%2BRmmQ%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to