Excellent! I'll try that! Thank you!

Rogan

On Fri, 03 Mar 2017 at 7:02 PM 'Norman Maurer' via Netty discussions <
[email protected]> wrote:

> You could just use the `FixedRecvByteBufAllocator` and set it to the max
> number of bytes you want to read.
>
>
> On 3. Mar 2017, at 15:49, Rogan Dawes <[email protected]> wrote:
>
> So I have achieved what I was after, to a certain extent, by overriding
> channelReadComplete() in my TCPCodec, and only calling
> super.channelReadComplete() if the number of in flight packets is below the
> threshold.
>
> This is now working for the FileChannel class. This same approach does not
> appear to work for an NioSocketChannel, though.
>
> And ... the reason for this is that my custom extension of FileChannel
> limits reads to 60 bytes at a time by overriding available(), and I haven't
> figured out how to do something similar for NioSocketChannel. So
> NioSocketChannel reads e.g. 9000 bytes of available data from the socket,
> then passes that to a ByteToMessageDecoder, which is obliged to process the
> whole amount of data, resulting in 150 messages before my
> channelReadComplete() method is called.
>
> Sigh! Any suggestions on how to constrain how many bytes are read from the
> socket at once?
>
> Regards,
>
> Rogan
>
>
> On Fri, Mar 3, 2017 at 3:25 PM Rogan Dawes <[email protected]> wrote:
>
> So I put some code back in to trigger a read() on the channel only if the
> number of "in flight" packets is below the threshold. I also made sure to
> set the channel autoread to false. However, it continues to read
> continuously, regardless of this. This seems to be as a result of this code
> path:
>
> Thread [pool-2-thread-1] (Suspended (breakpoint at line 111 in
> AbstractOioChannel))
> Main$ChunkedFileChannel(AbstractOioChannel).setReadPending(boolean) line:
> 111
> Main$ChunkedFileChannel(AbstractOioChannel).doBeginRead() line: 100
> FileChannel$FileChannelUnsafe(AbstractChannel$AbstractUnsafe).beginRead()
> line: 756
> DefaultChannelPipeline$HeadContext.read(ChannelHandlerContext) line: 1273
> ChannelHandlerInvokerUtil.invokeReadNow(ChannelHandlerContext) line: 150
> DefaultChannelHandlerInvoker.invokeRead(ChannelHandlerContext) line: 309
> AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeRead(ChannelHandlerContext)
> line: 127
> DefaultChannelHandlerContext(AbstractChannelHandlerContext).read() line:
> 526
> DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
> line: 433
> TCPCodec(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext)
> line: 155
> ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext)
> line: 92
> DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext)
> line: 167
> AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext)
> line: 91
> DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
> line: 412
> ChunkedFrameDecoder(ChannelHandlerAdapter).channelReadComplete(ChannelHandlerContext)
> line: 155
> ChannelHandlerInvokerUtil.invokeChannelReadCompleteNow(ChannelHandlerContext)
> line: 92
> DefaultChannelHandlerInvoker.invokeChannelReadComplete(ChannelHandlerContext)
> line: 167
> AbstractChannel$PausableChannelEventLoop(PausableChannelEventExecutor).invokeChannelReadComplete(ChannelHandlerContext)
> line: 91
> DefaultChannelPipeline$HeadContext(AbstractChannelHandlerContext).fireChannelReadComplete()
> line: 412
> DefaultChannelPipeline.fireChannelReadComplete() line: 962
> Main$ChunkedFileChannel(AbstractOioByteChannel).doRead() line: 153
> AbstractOioChannel$1.run() line: 44
> ThreadPerChannelEventLoop.run() line: 52
> SingleThreadEventExecutor$2.run() line: 116
> Thread.run() line: 745
>
> As expected my manual invocation of read() sets readPending to true, then
> triggers the AbstractOioChannel readTask, which does a read, sets
> readPending to false, and calls fireChannelReadComplete(). So far, so good.
>
> However, I set a breakpoint on setReadPending, (resulting in this stack
> trace), and saw that read() was being invoked again, independently of my
> own code.
>
> This appears to be as a result of:
>
> DefaultChannelHandlerContext(AbstractChannelHandlerContext).fireChannelReadComplete()
> line: 433
>
> which calls read() again, as a result of this code:
>
>         /**
>
>          * At this point, we are sure the handler of this context did not
> produce anything and
>
>          * we suppressed the {@code channelReadComplete()} event.
>
>          *
>
>          * If the next handler invoked {@link #read()} to read something
> but nothing was produced
>
>          * by the handler of this context, someone has to issue another {@link
> #read()} operation
>
>          * until the handler of this context produces something.
>
>          *
>
>          * Why? Because otherwise the next handler will not receive {@code
> channelRead()} nor
>
>          * {@code channelReadComplete()} event at all for the {@link
> #read()} operation it issued.
>
>          */
>
>         if (invokedPrevRead && !channel().config().isAutoRead()) {
>
>             /**
>
>              * The next (or upstream) handler invoked {@link #read()},
> but it didn't get any
>
>              * {@code channelRead()} event. We should read once more, so
> that the handler of the current
>
>              * context have a chance to produce something.
>
>              */
>
>             read();
>
>         } else {
>
>             invokedPrevRead = false;
>
>         }
>
>
> However, a message was produced, so I don't really understand what is
> going on here.
>
>
> Rogan
>
>
>
> On Thursday, March 2, 2017 at 4:35:10 PM UTC+2, Rogan Dawes wrote:
>
> Hi Norman,
>
> The code is available here:
>
>
> https://github.com/sensepost/USaBUSe/tree/master/HIDProxy/src/main/java/com/sensepost/hidproxy
>
> The important stuff is in Main, since there is not actually a lot going on.
>
> I have removed all the code that handles slowing the stream down, as I was
> unable to figure out how to get it to work, but you can see the FileChannel
> implementation, at least. I have had some interim success by overriding
> OioByteStreamChannel.available(), and limiting it to 60 bytes max, so now I
> need to reintroduce the limits on the number of packets in flight. I'm not
> sure if this is the right way, and also not sure of the best way to do it
> for a SocketChannel.
>
> I was actually thinking that using a custom ByteBufAllocator that only
> creates ByteBuf's with a max capacity of 60 bytes would be another
> approach, that would work for both types of channel. What do you think of
> this idea?
>
> Rogan
>
>
> On Thu, Mar 2, 2017 at 3:19 PM 'Norman Maurer' via Netty discussions <
> [email protected]> wrote:
>
> The solution would be to ensure the read() can only produce a limited
> number of bytes. I am not sure I understand how you implemented this kind
> of stuff tho.
>
>
> On 2. Mar 2017, at 13:34, Rogan Dawes <[email protected]> wrote:
>
> Nobody got any hints for me on this?
>
> I tried spinning in the ChannelHandler, waiting for the ack count to
> increment, but that simply blocked handling of any incoming messages
> containing the incremented ack :-(
>
> Is there no way to defer handling of a message until a later point in
> time, triggered by performing a read(), for example?
>
> Rogan
>
>
> On Tue, Feb 28, 2017 at 8:56 AM Rogan Dawes <[email protected]> wrote:
>
> So I have one major problem that is getting in the way, and that is
> managing flow control over the multiplexed link.
>
> I send a packet with a particular sequence number, and the receiver must
> ACK it, much like TCP. Each packet may be a maximum of 60 bytes, and I can
> have a maximum of 14 packets in flight at any one time (i.e. unacked), for
> performance reasons. In theory, this should allow for retransmission of any
> missing packets, but in reality, this results in flow control, which is
> also desirable.
>
> However, the one flow control aspect I don't have a good handle on is how
> to stop reading from the upstream channel. In one case, the Channel is a
> file, and the ByteBuf that is read contains the entire contents, being
> 10000 bytes. I have a "chunking handler" that chops this up into 60 byte
> chunks, which is fine, however, the thread tries to write all 167 chunks
> one after the other, without giving the receiver a chance to ack.
>
> How can I tell the pipeline to stop feeding me ByteBufs for the moment,
> but to resume when I call read() on the channel? AutoRead is already false,
> but that doesn't seem to help, because the very first read() results in far
> too much data being available, and it "must" be processed!
>
> Thanks!
>
> Rogan
>
>
> On Monday, February 20, 2017 at 9:26:14 AM UTC+2, Rogan Dawes wrote:
>
> Thanks for the response, it's good to know that I am at least on the right
> track.
>
> Rogan
>
>
> On Wed, Feb 15, 2017 at 5:32 PM Leonardo Gomes <[email protected]>
> wrote:
>
> Hi Rogan,
>
> That sounds like a good structure. You may want to check the Proxy example
> as well:
>
>
> https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/proxy
>
> You may want to look into MessageToByteEncoder as well.
>
> I've written a similar thing, but with a pool of channels (using Netty's
> pool) as I didn't have any requirement to write on a specific outbound
> socket, but could take any from a pool of sockets I ended up creating.
>
> Cheers,
> Leo.
>
>
> On Tue, Feb 14, 2017 at 4:58 PM, Rogan Dawes <[email protected]> wrote:
>
> Hi,
>
> I'm trying to use netty to implement a multiplexer/demultiplexer. Think
> multiple streams travelling together over a single socket, then getting
> spread out into individual sockets in my code.
>
> I'm basing it on the example socks proxy code, which seems to have the
> basics in place (server and client together). However, I'm trying to figure
> out how best to deal with a couple of requirements:
>
> The incoming traffic (call it downstream) has a stream number, which is
> used to identify which stream it belongs to, followed by some flags,
> sequence numbers, a length byte, and 60 bytes of data (of which "length"
> are usable). Depending on the flags, I will have to open a new client
> socket, and link it to the relevant stream number.
>
> It seems to me that the server pipeline will have a simple handler that
> essentially looks up the channel/pipeline associated with a particular
> stream, or, if it does not exist, creates it. Each "client pipeline" (call
> it upstream) will have a first handler that knows how to process the flags,
> verify the sequence numbers, and extract the actual data, then write that
> data into the pipeline. It seems like this could be implemented as a
> MessageToMessageCodec, right? Packet comes in, ByteBuf of actual data goes
> out, to be written to the stream.
>
> In the reverse direction, any data coming from upstream will have to be
> broken up into chunks of max 60 bytes, then put into a "Packet" that simply
> gets written to the downstream channel.
>
> Does this make sense? Does it seem like a reasonable way of structuring
> the pipelines?
>
> Thanks for your help.
>
> Rogan
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
>
>
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com
> <https://groups.google.com/d/msgid/netty/7e3124d3-9791-4d4c-8d6e-462735a473b4%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
>
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
>
>
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com
> <https://groups.google.com/d/msgid/netty/CAEAdJ9ruupfn-7fXvWi1vhW8yg0-DkWFBAqmjymPYRYf%3D%2BfAkA%40mail.gmail.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
>
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com
> <https://groups.google.com/d/msgid/netty/06b2eeaf-b212-4b2c-b738-c89cb5b3d2e1%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
>
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com
> <https://groups.google.com/d/msgid/netty/CAOYdKdhQSKeoGhyUJN2RAE_yubY2Dw0SoUYq8E9kygUOYn51qA%40mail.gmail.com?utm_medium=email&utm_source=footer>
> .
>
>
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com
> <https://groups.google.com/d/msgid/netty/D7BCE665-CEA1-4FA8-84B3-66991FCE1EC6%40googlemail.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com
> <https://groups.google.com/d/msgid/netty/e93aea0f-4230-42d3-b9c1-f12d7d2a405b%40googlegroups.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
>
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/CAOYdKdhwd3h9fK5%3De-gLQNqN%3D9KLu3sVpWqwXD7TnX8d0%2BRmmQ%40mail.gmail.com
> <https://groups.google.com/d/msgid/netty/CAOYdKdhwd3h9fK5%3De-gLQNqN%3D9KLu3sVpWqwXD7TnX8d0%2BRmmQ%40mail.gmail.com?utm_medium=email&utm_source=footer>
> .
>
>
> For more options, visit https://groups.google.com/d/optout.
>
> --
> You received this message because you are subscribed to the Google Groups
> "Netty discussions" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/netty/D46ED5B8-7EDD-43D3-9AB9-ACCE2EBBBF01%40googlemail.com
> <https://groups.google.com/d/msgid/netty/D46ED5B8-7EDD-43D3-9AB9-ACCE2EBBBF01%40googlemail.com?utm_medium=email&utm_source=footer>
> .
> For more options, visit https://groups.google.com/d/optout.
>

-- 
You received this message because you are subscribed to the Google Groups 
"Netty discussions" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To view this discussion on the web visit 
https://groups.google.com/d/msgid/netty/CAOYdKdhW7rCy-O000teF4gx50sgBsDparANnLbS%2ByBp1QG7%3D2A%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to