[ https://issues.apache.org/jira/browse/ARTEMIS-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Justin Bertram resolved ARTEMIS-2325. ------------------------------------- Fix Version/s: 2.12.0 Resolution: Fixed > SendAcknowledgementHandler when multiple mesages are sent > --------------------------------------------------------- > > Key: ARTEMIS-2325 > URL: https://issues.apache.org/jira/browse/ARTEMIS-2325 > Project: ActiveMQ Artemis > Issue Type: Bug > Environment: Using maven artifact version > {color:#6a8759}artemis-core-client 2.7.0 > {color} > Reporter: Riyafa Abdul Hameed > Priority: Major > Fix For: 2.12.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > When I try to send multiple message while using a > SendAcknowledgementHandler the following code fails: > {code:java} > import org.apache.activemq.artemis.api.core.client.ActiveMQClient; > import org.apache.activemq.artemis.api.core.client.ClientMessage; > import org.apache.activemq.artemis.api.core.client.ClientProducer; > import org.apache.activemq.artemis.api.core.client.ClientSession; > import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; > import org.apache.activemq.artemis.api.core.client.ServerLocator; > public class ProducerInvalid { > public static void main(String[] args) throws Exception { > ServerLocator locator = > ActiveMQClient.createServerLocator("tcp://localhost:61616"); > locator.setConfirmationWindowSize(10240); > ClientSessionFactory factory = locator.createSessionFactory(); > ClientSession session = factory.createSession(); > // A producer is associated with an address ... > ClientProducer producer = session.createProducer("example"); > for (int i = 0; i < 1000000; i++) { > ClientMessage message = session.createMessage(true); > message.getBodyBuffer().writeString("Hello " + i); > producer.send(message, message1 -> > System.out.println(message1.getBodyBuffer().readString())); > } > } > }{code} > The exception thrown is as follows: > Apr 29, 2019 11:08:44 AM > {code:java} > org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler > bufferReceived > ERROR: AMQ214031: Failed to decode buffer, disconnect immediately. > java.lang.IllegalStateException: java.lang.IndexOutOfBoundsException: > readerIndex(22) + length(4) exceeds writerIndex(22): > UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: 1500, unwrapped: > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: > 70, cap: 1500)) > at > org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:381) > at > org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl$DelegatingBufferHandler.bufferReceived(ClientSessionFactoryImpl.java:1191) > at > org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.channelRead(ActiveMQChannelHandler.java:73) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) > at > io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323) > at > io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310) > at > io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:426) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) > at > io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796) > at > io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) > at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) > at > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) > Caused by: java.lang.IndexOutOfBoundsException: readerIndex(22) + length(4) > exceeds writerIndex(22): UnpooledDuplicatedByteBuf(ridx: 22, widx: 22, cap: > 1500, unwrapped: > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: > 70, cap: 1500)) > at > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1428) > at io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:802) > at io.netty.buffer.WrappedByteBuf.readInt(WrappedByteBuf.java:571) > at > org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readStringInternal(ChannelBufferWrapper.java:92) > at > org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper.readString(ChannelBufferWrapper.java:88) > at > org.riyafa.mytests.ProducerInvalid.lambda$main$0(ProducerInvalid.java:23) > at > org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.callSendAck(ActiveMQSessionContext.java:232) > at > org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$2.handleResponse(ActiveMQSessionContext.java:220) > at > org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext$1.commandConfirmed(ActiveMQSessionContext.java:203) > at > org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.clearUpTo(ChannelImpl.java:755) > at > org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.handlePacket(ChannelImpl.java:693) > at > org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.doBufferReceived(RemotingConnectionImpl.java:399) > at > org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:376) > ... 21 more{code} > I am not sure if SendAcknowledgementHandler is not supposed to be used in the > above manner. > The following works fine: > {code:java} > import org.apache.activemq.artemis.api.core.client.*; > public class Producer { > public static void main(String[] args) throws Exception { > ServerLocator locator = > ActiveMQClient.createServerLocator("tcp://localhost:61616"); > locator.setConfirmationWindowSize(10240); > ClientSessionFactory factory = locator.createSessionFactory(); > ClientSession session = factory.createSession(); > session.setSendAcknowledgementHandler(message1 -> > System.out.println(message1.getBodyBuffer().readString())); > // A producer is associated with an address ... > ClientProducer producer = session.createProducer("example"); > for (int i = 0; i < 1000000; i++) { > ClientMessage message = session.createMessage(true); > message.getBodyBuffer().writeString("Hello " + i); > producer.send(message); > } > } > }{code} > But I would like to have an acknowledgment handler per send. Is it not > possible? -- This message was sent by Atlassian Jira (v8.3.4#803005)