Found the reason:
You should use getBoddyBuffer instead. getbodybuffer() will perform a call to checkBuffer() before returning you the large message. This should fix your test / problem. On Tue, Feb 22, 2022 at 1:03 PM Clebert Suconic <clebert.suco...@gmail.com> wrote: > > this is something done on the core API. > > I can't pinpoint the exact reason, but the JMS facade works fine with > a similar sender and producer: > > https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257 > > > > I created a branch with a test comparing Core and JMS API on my fork: > > https://github.com/clebertsuconic/activemq-artemis/tree/withCore > > > > I'm pasting the test here for future reference if the gist is ever > gone (or the branch is gone): > > > /** > * Licensed to the Apache Software Foundation (ASF) under one or more > * contributor license agreements. See the NOTICE file distributed with > * this work for additional information regarding copyright ownership. > * The ASF licenses this file to You under the Apache License, Version 2.0 > * (the "License"); you may not use this file except in compliance with > * the License. You may obtain a copy of the License at > * <p> > * http://www.apache.org/licenses/LICENSE-2.0 > * <p> > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > package org.apache.activemq.artemis.tests.integration.paging; > > import javax.jms.Connection; > import javax.jms.ConnectionFactory; > import javax.jms.MessageConsumer; > import javax.jms.MessageProducer; > import javax.jms.Queue; > import javax.jms.Session; > import javax.jms.TextMessage; > import java.util.concurrent.atomic.AtomicInteger; > > import org.apache.activemq.artemis.api.core.QueueConfiguration; > import org.apache.activemq.artemis.api.core.RoutingType; > import org.apache.activemq.artemis.api.core.SimpleString; > import org.apache.activemq.artemis.api.core.client.ActiveMQClient; > import org.apache.activemq.artemis.api.core.client.ClientMessage; > import org.apache.activemq.artemis.api.core.client.ClientProducer; > import org.apache.activemq.artemis.api.core.client.ClientSession; > import org.apache.activemq.artemis.core.config.Configuration; > import org.apache.activemq.artemis.core.server.ActiveMQServer; > import org.apache.activemq.artemis.core.server.impl.AddressInfo; > import > org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; > import org.apache.activemq.artemis.core.settings.impl.AddressSettings; > import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; > import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; > import org.apache.activemq.artemis.tests.util.Wait; > import org.checkerframework.checker.units.qual.A; > import org.junit.Assert; > import org.junit.Before; > import org.junit.Test; > > public class Main extends ActiveMQTestBase { > > private final static String QUEUE = > "service.images.dev::service.images.dev"; > > ActiveMQServer server; > > > @Before > @Override > public void setUp() throws Exception { > super.setUp(); > > Configuration config = createDefaultConfig(0, > true).setJournalSyncNonTransactional(false); > > config.setMessageExpiryScanPeriod(-1); > server = createServer(true, config, 100 * 1024 * 1024, 10 * 1024 * > 1024); > > server.getAddressSettingsRepository().clear(); > > AddressSettings defaultSetting = new > AddressSettings().setPageSizeBytes(100 * 1024 * > 1024).setMaxSizeBytes(10 * 1024 * > 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false); > > server.getAddressSettingsRepository().addMatch("#", defaultSetting); > > > server.start(); > > > server.addAddressInfo(new > AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST)); > server.createQueue(new > QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST)); > > } > > @Test > public void testSending() throws Exception { > > final String username = null; > final String password = null; > > var serverLocator = > ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024); > > final var sessionFactory = serverLocator.createSessionFactory(); > > final var xa = false; > final var autoCommitSends = true; > final var autoCommitAcks = true; > final var ackBatchSize = serverLocator.getAckBatchSize(); > final var preAcknowledge = serverLocator.isPreAcknowledge(); > final var clientSession = sessionFactory.createSession(username, > password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, > ackBatchSize); > > var queueQueryResult = > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE)); > if (!queueQueryResult.isExists()) { > clientSession.createQueue(_ServiceQueueConfiguration(new > SimpleString(QUEUE))); > } > > final var consumer = clientSession.createConsumer(QUEUE); > > clientSession.start(); > > AtomicInteger errors = new AtomicInteger(0); > AtomicInteger received = new AtomicInteger(0); > > consumer.setMessageHandler((msg) -> { > try { > msg.getDataBuffer(); > received.incrementAndGet(); > } catch (Throwable e) { > e.printStackTrace(); > errors.incrementAndGet(); > } > }); > > > try (ClientSession producerSession = sessionFactory.createSession()) { > ClientProducer producer = producerSession.createProducer(QUEUE); > for (int i = 0; i < 100; i++) { > ClientMessage message = producerSession.createMessage(true); > message.getBodyBuffer().writeBytes(new byte[1024 * 1024]); > producer.send(message); > } > } > > Wait.assertEquals(100, received::get); > Assert.assertEquals(0, errors.get()); > } > > @Test > public void testWithJMSListener() throws Exception { > > final String username = null; > final String password = null; > > ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); > factory.setMinLargeMessageSize(1024); > Connection connection = factory.createConnection(); > Session sessionProducer = connection.createSession(true, > Session.SESSION_TRANSACTED); > > Session sessionConsumer = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > > Queue jmsQueue = sessionProducer.createQueue(QUEUE); > > MessageConsumer consumer = sessionConsumer.createConsumer(jmsQueue); > > > connection.start(); > > AtomicInteger errors = new AtomicInteger(0); > AtomicInteger received = new AtomicInteger(0); > > consumer.setMessageListener((msg) -> { > try { > System.out.println("Received: " + > ((TextMessage)msg).getText().length()); > received.incrementAndGet(); > } catch (Throwable e) { > e.printStackTrace(); > errors.incrementAndGet(); > } > > }); > > MessageProducer producer = sessionProducer.createProducer(jmsQueue); > > StringBuffer buffer = new StringBuffer(); > while (buffer.length() < 100 * 1024) { > buffer.append("*****"); > } > > for (int i = 0; i < 100; i++) { > TextMessage message = > sessionProducer.createTextMessage(buffer.toString()); > producer.send(message); > } > sessionProducer.commit(); > > Wait.assertEquals(100, received::get); > Assert.assertEquals(0, errors.get()); > } > > private static QueueConfiguration > _ServiceQueueConfiguration(SimpleString queueName) { > final var config = new QueueConfiguration(queueName); > config.setMaxConsumers(1); > config.setPurgeOnNoConsumers(false); > config.setDurable(false); > config.setAutoDelete(false); > config.setRoutingType(RoutingType.MULTICAST); > return config; > } > } > > On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jbert...@apache.org> wrote: > > > > No. The test was super-simple. Just send one large message and then consume > > it. > > > > > > Justin > > > > On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <clebert.suco...@gmail.com> > > wrote: > > > > > It did not involved paging ? > > > > > > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jbert...@apache.org> > > > wrote: > > > > > > > I recreated this exception with a very simple test-case. I took the > > > > consumer code pasted earlier in the thread and just added a producer > > > > sending a large message. I lowered the minLargeMessageSize to make it > > > > faster. I thought I still had that code laying around somewhere, but I > > > > can't find it at the moment. > > > > > > > > > > > > Justin > > > > > > > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic < > > > clebert.suco...@gmail.com > > > > > > > > > wrote: > > > > > > > > > I have seen (and fixed) cases where the large message file is gone. I > > > > > would need a reproducer creating the issue from scratch (send and > > > > consume) > > > > > > > > > > Typically it could be associated with paging ? Did you have the > > > > > destination in page mode ? > > > > > > > > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <t...@abcwxy.com> wrote: > > > > > > > > > > > The client code below the stack trace - will reproduce it - the > > > > > > msg.getDataBuffer() inside the handler of that code will trigger it > > > > when > > > > > a > > > > > > large message is sent to the address. The exception in question > > > > > > (IndexOutOfBoundsException) is being caught in line 249 of the > > > > > > apache > > > > > > CoreMessage code... and turned into a logged warning on line 250. > > > The > > > > > > CoreMessage code then returns the buffer from getReadOnlyBuffer() > > > > (which > > > > > > appears to be fine from a quick survey - data also seems to be > > > > preserved > > > > > > and accessible from client code) - it is just not clear what the > > > intent > > > > > is > > > > > > if the exception code is executed... is it a "don't worry about it" > > > > > > - > > > > or > > > > > a > > > > > > "something is wrong here" and I should concern myself with > > > > > > something? > > > > > (the > > > > > > warning level is making me believe that it is more than a "don't > > > worry > > > > > > about it" - but the fact that the exception was caught and a valid > > > > buffer > > > > > > is returned makes me think it is just a fallback for the other > > > choices > > > > of > > > > > > buffers - and I should not worry about it?). > > > > > > > > > > > > Thanks for any insight you may have.... > > > > > > > > > > > > logged warning from the caught exception is: > > > > > > > > > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage - > > > > > readerIndex(4) > > > > > > + length(270740) exceeds writerIndex(4): > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > 4, > > > > > > widx: 4, cap: 270740) > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740) > > > > > > exceeds writerIndex(4): > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > 4, > > > > > > widx: 4, cap: 270740) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) > > > > > > at > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241) > > > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) > > > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG > > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl - > > > > Sending > > > > > > 65962 from flow-control > > > > > > > > > > > > Simple Client Code below: > > > > > > > > > > > > > > > > > > public class Main { > > > > > > > > > > > > private final static String ACCEPTOR = "tcp://localhost:9322"; > > > > > > private final static String QUEUE="service.images.dev:: > > > > > > service.images.dev"; > > > > > > > > > > > > public static void main(String[] args) throws Exception { > > > > > > > > > > > > final String username = null; > > > > > > final String password = null; > > > > > > > > > > > > var serverLocator = ActiveMQClient > > > > > > .createServerLocator(ACCEPTOR) > > > > > > .setBlockOnDurableSend(true) > > > > > > .setBlockOnNonDurableSend(true); > > > > > > > > > > > > final var sessionFactory = > > > > serverLocator.createSessionFactory(); > > > > > > > > > > > > final var xa = false; > > > > > > final var autoCommitSends = true; > > > > > > final var autoCommitAcks = true; > > > > > > final var ackBatchSize = serverLocator.getAckBatchSize(); > > > > > > final var preAcknowledge = serverLocator.isPreAcknowledge(); > > > > > > final var clientSession = sessionFactory.createSession( > > > > > > username, > > > > > > password, > > > > > > xa, > > > > > > autoCommitSends, > > > > > > autoCommitAcks, > > > > > > preAcknowledge, > > > > > > ackBatchSize > > > > > > ); > > > > > > > > > > > > var queueQueryResult = > > > > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE)); > > > > > > if (!queueQueryResult.isExists()) { > > > > > > clientSession.createQueue(_ServiceQueueConfiguration(new > > > > > > SimpleString(QUEUE))); > > > > > > } > > > > > > > > > > > > final var consumer = clientSession.createConsumer(QUEUE); > > > > > > > > > > > > clientSession.start(); > > > > > > > > > > > > consumer.setMessageHandler((msg) -> { > > > > > > > > > > > > System.out.println("Received: "+msg.getBodySize()); > > > > > > msg.getDataBuffer(); > > > > > > > > > > > > }); > > > > > > > > > > > > while(true) { > > > > > > Thread.sleep(1000); > > > > > > } > > > > > > > > > > > > } > > > > > > > > > > > > private static QueueConfiguration > > > > > > _ServiceQueueConfiguration(SimpleString queueName) { > > > > > > final var config = new QueueConfiguration(queueName); > > > > > > config.setMaxConsumers(1); > > > > > > config.setPurgeOnNoConsumers(false); > > > > > > config.setDurable(false); > > > > > > config.setAutoDelete(false); > > > > > > config.setRoutingType(RoutingType.MULTICAST); > > > > > > return config; > > > > > > } > > > > > > > > > > > > > > > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jbert...@apache.org> > > > > > > wrote: > > > > > > > > > > > > > Typically an IndexOutOfBoundsException indicates a bug. Do you > > > have a > > > > > way > > > > > > > to reproduce this? > > > > > > > > > > > > > > > > > > > > > Justin > > > > > > > > > > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <t...@abcwxy.com> wrote: > > > > > > > > > > > > > > > This seems to appear on larger messages only - I am getting a > > > > warning > > > > > > > when > > > > > > > > calling getDataBuffer (2.20.0 Artemis Client). Curious if there > > > is > > > > > > > > something I may be missing - or if this is completely ignorable? > > > > > > Thanks - > > > > > > > > Tim > > > > > > > > > > > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2 > > > > > > (ActiveMQ-client-global-threads)] > > > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2 > > > > > > > (ActiveMQ-client-global-threads)] > > > > > > > > readerIndex(270740) + length(270740) exceeds > > > > > > > > writerIndex(271572): > > > > > > > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > > > > > > 270740, widx: 271572, cap: 271572) > > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) + > > > > > > length(270740) > > > > > > > > exceeds writerIndex(271572): > > > > > > > > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: > > > > > > > > 270740, widx: 271572, cap: 271572) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) > > > > > > > > at > > > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264) > > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241) > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Clebert Suconic > > > > > > > > > > > > -- > > > Clebert Suconic > > > > > > > -- > Clebert Suconic -- Clebert Suconic