Based on what you've posted it looks to me like you need to factor in Transport.capacity() into your code somehow. Transport.capacity() will return the number of bytes you can currently pump into the transport, or it will return Transport.END_OF_STREAM if it is no longer legal to pump in more bytes. The END_OF_STREAM state should be handled by shutting down the input on your socket. This should also provide you with a means to apply back-pressure since if/when the capacity hits 0 you should pause your socket reads.
--Rafael On Thu, Jun 12, 2014 at 12:02 PM, Clebert Suconic <[email protected]> wrote: > I have isolated my entire Proton usage on its own module in a way to > isolate a bug and ask for help here (instead of asking for someone to > download the entire hornetq). > > > As a result, I think I've isolated what most servers would need in order > to plug proton. > > > You have a input method on a class that I called ProtonConnection, > input... that class is pumping bytes on Proton. > > > And all you need is to provide two interfaces to tell how a broker should > behave when things are happening. things like createProducer, send message, > deliver a message to a client... etc. > > > There are a few things that are not yet covered by Proton itself (like how > to tell the broker to stop pumping on Proton), but these are currently > limitations already. > > > If we get multiple projects using the same Plug, you could have the entire > Proton/Codec usage isolated on a single place. > > > # I"m not sure Yet I will keep this as a separate module. It will all > depend on external interest on using it. > I'm at the moment isolating this module here: > https://github.com/clebertsuconic/ProtonPlug > > > If you run the tests, you will see this exception: > > > > io.netty.handler.codec.DecoderException: java.nio.BufferOverflowException > at > io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:272) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149) > at > io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:341) > at > io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:327) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:507) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:464) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:378) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:350) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) > at java.lang.Thread.run(Thread.java:724) > Caused by: java.nio.BufferOverflowException > at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:208) > at > org.hornetq.amqp.dealer.util.ProtonTrio.pump(ProtonTrio.java:130) > at > org.hornetq.amqp.dealer.ProtonRemotingConnection.inputBuffer(ProtonRemotingConnection.java:66) > at > org.hornetq.amqp.test.dumbserver.MinimalServer$ProtocolDecoder.decode(MinimalServer.java:159) > at > io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:241) > ... 11 more > > > > This is basically done at: > ProtonTrio: > > public boolean pump(ByteBuf bytes) > { > try > { > synchronized (lock) > { > if (bytes.writerIndex() < 8) > { > return false; > } > System.out.println("Size:" + bytes.writerIndex()); > System.out.println("Capacity on target:" + > transport.getInputBuffer().capacity()); > System.out.println("Position on target:" + > transport.getInputBuffer().position()); > ByteBuffer tmp = bytes.internalNioBuffer(0, > bytes.writerIndex()); > > /// It will explode here !!!! > transport.getInputBuffer().put(tmp); > > > > > If I could get some help?
