Found the reason:

You should use getBoddyBuffer instead.


getbodybuffer() will perform a call to checkBuffer() before returning
you the large message.


This should fix your test / problem.

On Tue, Feb 22, 2022 at 1:03 PM Clebert Suconic
<clebert.suco...@gmail.com> wrote:
>
> this is something done on the core API.
>
> I can't pinpoint the exact reason, but the JMS facade works fine with
> a similar sender and producer:
>
> https://gist.github.com/clebertsuconic/03fe7206914d8753e9bd966f805a0257
>
>
>
> I created a branch with a test comparing Core and JMS API on my fork:
>
> https://github.com/clebertsuconic/activemq-artemis/tree/withCore
>
>
>
> I'm pasting the test here for future reference if the gist is ever
> gone (or the branch is gone):
>
>
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements. See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License. You may obtain a copy of the License at
>  * <p>
>  * http://www.apache.org/licenses/LICENSE-2.0
>  * <p>
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
>
> package org.apache.activemq.artemis.tests.integration.paging;
>
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import org.apache.activemq.artemis.api.core.QueueConfiguration;
> import org.apache.activemq.artemis.api.core.RoutingType;
> import org.apache.activemq.artemis.api.core.SimpleString;
> import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
> import org.apache.activemq.artemis.api.core.client.ClientMessage;
> import org.apache.activemq.artemis.api.core.client.ClientProducer;
> import org.apache.activemq.artemis.api.core.client.ClientSession;
> import org.apache.activemq.artemis.core.config.Configuration;
> import org.apache.activemq.artemis.core.server.ActiveMQServer;
> import org.apache.activemq.artemis.core.server.impl.AddressInfo;
> import 
> org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
> import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
> import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
> import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
> import org.apache.activemq.artemis.tests.util.Wait;
> import org.checkerframework.checker.units.qual.A;
> import org.junit.Assert;
> import org.junit.Before;
> import org.junit.Test;
>
> public class Main extends ActiveMQTestBase {
>
>    private final static String QUEUE = 
> "service.images.dev::service.images.dev";
>
>    ActiveMQServer server;
>
>
>    @Before
>    @Override
>    public void setUp() throws Exception {
>       super.setUp();
>
>       Configuration config = createDefaultConfig(0,
> true).setJournalSyncNonTransactional(false);
>
>       config.setMessageExpiryScanPeriod(-1);
>       server = createServer(true, config, 100 * 1024 * 1024, 10 * 1024 * 
> 1024);
>
>       server.getAddressSettingsRepository().clear();
>
>       AddressSettings defaultSetting = new
> AddressSettings().setPageSizeBytes(100 * 1024 *
> 1024).setMaxSizeBytes(10 * 1024 *
> 1024).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
>
>       server.getAddressSettingsRepository().addMatch("#", defaultSetting);
>
>
>       server.start();
>
>
>       server.addAddressInfo(new
> AddressInfo(QUEUE).addRoutingType(RoutingType.ANYCAST));
>       server.createQueue(new
> QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
>
>    }
>
>    @Test
>    public void testSending() throws Exception {
>
>       final String username = null;
>       final String password = null;
>
>       var serverLocator =
> ActiveMQClient.createServerLocator("tcp://localhost:61616").setBlockOnDurableSend(true).setBlockOnNonDurableSend(true).setMinLargeMessageSize(1024);
>
>       final var sessionFactory = serverLocator.createSessionFactory();
>
>       final var xa = false;
>       final var autoCommitSends = true;
>       final var autoCommitAcks = true;
>       final var ackBatchSize = serverLocator.getAckBatchSize();
>       final var preAcknowledge = serverLocator.isPreAcknowledge();
>       final var clientSession = sessionFactory.createSession(username,
> password, xa, autoCommitSends, autoCommitAcks, preAcknowledge,
> ackBatchSize);
>
>       var queueQueryResult =
> clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
>       if (!queueQueryResult.isExists()) {
>          clientSession.createQueue(_ServiceQueueConfiguration(new
> SimpleString(QUEUE)));
>       }
>
>       final var consumer = clientSession.createConsumer(QUEUE);
>
>       clientSession.start();
>
>       AtomicInteger errors = new AtomicInteger(0);
>       AtomicInteger received = new AtomicInteger(0);
>
>       consumer.setMessageHandler((msg) -> {
>          try {
>             msg.getDataBuffer();
>             received.incrementAndGet();
>          } catch (Throwable e) {
>             e.printStackTrace();
>             errors.incrementAndGet();
>          }
>       });
>
>
>       try (ClientSession producerSession = sessionFactory.createSession()) {
>          ClientProducer producer = producerSession.createProducer(QUEUE);
>          for (int i = 0; i < 100; i++) {
>             ClientMessage message = producerSession.createMessage(true);
>             message.getBodyBuffer().writeBytes(new byte[1024 * 1024]);
>             producer.send(message);
>          }
>       }
>
>       Wait.assertEquals(100, received::get);
>       Assert.assertEquals(0, errors.get());
>    }
>
>    @Test
>    public void testWithJMSListener() throws Exception {
>
>       final String username = null;
>       final String password = null;
>
>       ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
>       factory.setMinLargeMessageSize(1024);
>       Connection connection = factory.createConnection();
>       Session sessionProducer = connection.createSession(true,
> Session.SESSION_TRANSACTED);
>
>       Session sessionConsumer = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
>       Queue jmsQueue = sessionProducer.createQueue(QUEUE);
>
>       MessageConsumer consumer = sessionConsumer.createConsumer(jmsQueue);
>
>
>       connection.start();
>
>       AtomicInteger errors = new AtomicInteger(0);
>       AtomicInteger received = new AtomicInteger(0);
>
>       consumer.setMessageListener((msg) -> {
>          try {
>             System.out.println("Received: " +
> ((TextMessage)msg).getText().length());
>             received.incrementAndGet();
>          } catch (Throwable e) {
>             e.printStackTrace();
>             errors.incrementAndGet();
>          }
>
>       });
>
>       MessageProducer producer = sessionProducer.createProducer(jmsQueue);
>
>       StringBuffer buffer = new StringBuffer();
>       while (buffer.length() < 100 * 1024) {
>          buffer.append("*****");
>       }
>
>       for (int i = 0; i < 100; i++) {
>          TextMessage message =
> sessionProducer.createTextMessage(buffer.toString());
>          producer.send(message);
>       }
>       sessionProducer.commit();
>
>       Wait.assertEquals(100, received::get);
>       Assert.assertEquals(0, errors.get());
>    }
>
>    private static QueueConfiguration
> _ServiceQueueConfiguration(SimpleString queueName) {
>       final var config = new QueueConfiguration(queueName);
>       config.setMaxConsumers(1);
>       config.setPurgeOnNoConsumers(false);
>       config.setDurable(false);
>       config.setAutoDelete(false);
>       config.setRoutingType(RoutingType.MULTICAST);
>       return config;
>    }
> }
>
> On Mon, Feb 21, 2022 at 2:00 PM Justin Bertram <jbert...@apache.org> wrote:
> >
> > No. The test was super-simple. Just send one large message and then consume
> > it.
> >
> >
> > Justin
> >
> > On Mon, Feb 21, 2022 at 12:57 PM Clebert Suconic <clebert.suco...@gmail.com>
> > wrote:
> >
> > > It did not involved paging ?
> > >
> > > On Mon, Feb 21, 2022 at 11:07 AM Justin Bertram <jbert...@apache.org>
> > > wrote:
> > >
> > > > I recreated this exception with a very simple test-case. I took the
> > > > consumer code pasted earlier in the thread and just added a producer
> > > > sending a large message. I lowered the minLargeMessageSize to make it
> > > > faster. I thought I still had that code laying around somewhere, but I
> > > > can't find it at the moment.
> > > >
> > > >
> > > > Justin
> > > >
> > > > On Sun, Feb 20, 2022 at 9:03 AM Clebert Suconic <
> > > clebert.suco...@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > I have seen (and fixed) cases where the large message file is gone.  I
> > > > > would need a reproducer creating the issue from scratch (send and
> > > > consume)
> > > > >
> > > > > Typically it could be associated with paging ?  Did you have the
> > > > > destination in page mode ?
> > > > >
> > > > > On Thu, Feb 17, 2022 at 6:53 PM Tim Jones <t...@abcwxy.com> wrote:
> > > > >
> > > > > > The client code below the stack trace - will reproduce it  - the
> > > > > > msg.getDataBuffer() inside the handler of that code will trigger it
> > > > when
> > > > > a
> > > > > > large message is sent to the address.  The exception in question
> > > > > > (IndexOutOfBoundsException) is being caught in line 249 of the 
> > > > > > apache
> > > > > > CoreMessage code... and turned into a logged warning on line 250.
> > > The
> > > > > > CoreMessage code then returns the buffer from getReadOnlyBuffer()
> > > > (which
> > > > > > appears to be fine from a quick survey - data also seems to be
> > > > preserved
> > > > > > and accessible from client code) - it is just not clear what the
> > > intent
> > > > > is
> > > > > > if the exception code is executed... is it a "don't worry about it" 
> > > > > > -
> > > > or
> > > > > a
> > > > > > "something is wrong here" and I should concern myself with 
> > > > > > something?
> > > > > (the
> > > > > > warning level is making me believe that it is more than a "don't
> > > worry
> > > > > > about it" - but the fact that the exception was caught and a valid
> > > > buffer
> > > > > > is returned makes me think it is just a fallback for the other
> > > choices
> > > > of
> > > > > > buffers - and I should not worry about it?).
> > > > > >
> > > > > > Thanks for any insight you may have....
> > > > > >
> > > > > > logged warning from the caught exception is:
> > > > > >
> > > > > > 16:26:50.923 [Thread-0 (ActiveMQ-client-global-threads)] WARN
> > > > > > org.apache.activemq.artemis.core.message.impl.CoreMessage -
> > > > > readerIndex(4)
> > > > > > + length(270740) exceeds writerIndex(4):
> > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > 4,
> > > > > > widx: 4, cap: 270740)
> > > > > > java.lang.IndexOutOfBoundsException: readerIndex(4) + length(270740)
> > > > > > exceeds writerIndex(4):
> > > > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > 4,
> > > > > > widx: 4, cap: 270740)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > at
> > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > at io.m45.sart.Main.lambda$main$0(Main.java:57)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
> > > > > > 16:26:50.924 [Thread-0 (ActiveMQ-client-netty-threads)] DEBUG
> > > > > > org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl -
> > > > Sending
> > > > > > 65962 from flow-control
> > > > > >
> > > > > > Simple Client Code below:
> > > > > >
> > > > > >
> > > > > > public class Main {
> > > > > >
> > > > > >     private final static String ACCEPTOR = "tcp://localhost:9322";
> > > > > >     private final static String QUEUE="service.images.dev::
> > > > > > service.images.dev";
> > > > > >
> > > > > >     public static void main(String[] args) throws Exception {
> > > > > >
> > > > > >         final String username = null;
> > > > > >         final String password = null;
> > > > > >
> > > > > >         var serverLocator = ActiveMQClient
> > > > > >                 .createServerLocator(ACCEPTOR)
> > > > > >                 .setBlockOnDurableSend(true)
> > > > > >                 .setBlockOnNonDurableSend(true);
> > > > > >
> > > > > >         final var sessionFactory =
> > > > serverLocator.createSessionFactory();
> > > > > >
> > > > > >         final var xa = false;
> > > > > >         final var autoCommitSends = true;
> > > > > >         final var autoCommitAcks = true;
> > > > > >         final var ackBatchSize = serverLocator.getAckBatchSize();
> > > > > >         final var preAcknowledge = serverLocator.isPreAcknowledge();
> > > > > >         final var clientSession = sessionFactory.createSession(
> > > > > >                 username,
> > > > > >                 password,
> > > > > >                 xa,
> > > > > >                 autoCommitSends,
> > > > > >                 autoCommitAcks,
> > > > > >                 preAcknowledge,
> > > > > >                 ackBatchSize
> > > > > >         );
> > > > > >
> > > > > >         var queueQueryResult =
> > > > > > clientSession.queueQuery(SimpleString.toSimpleString(QUEUE));
> > > > > >         if (!queueQueryResult.isExists()) {
> > > > > >             clientSession.createQueue(_ServiceQueueConfiguration(new
> > > > > > SimpleString(QUEUE)));
> > > > > >         }
> > > > > >
> > > > > >         final var consumer = clientSession.createConsumer(QUEUE);
> > > > > >
> > > > > >         clientSession.start();
> > > > > >
> > > > > >         consumer.setMessageHandler((msg) -> {
> > > > > >
> > > > > >             System.out.println("Received: "+msg.getBodySize());
> > > > > >             msg.getDataBuffer();
> > > > > >
> > > > > >         });
> > > > > >
> > > > > >         while(true) {
> > > > > >             Thread.sleep(1000);
> > > > > >         }
> > > > > >
> > > > > >     }
> > > > > >
> > > > > >     private static QueueConfiguration
> > > > > > _ServiceQueueConfiguration(SimpleString queueName) {
> > > > > >         final var config = new QueueConfiguration(queueName);
> > > > > >         config.setMaxConsumers(1);
> > > > > >         config.setPurgeOnNoConsumers(false);
> > > > > >         config.setDurable(false);
> > > > > >         config.setAutoDelete(false);
> > > > > >         config.setRoutingType(RoutingType.MULTICAST);
> > > > > >         return config;
> > > > > >     }
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 17, 2022 at 2:28 PM Justin Bertram <jbert...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > Typically an IndexOutOfBoundsException indicates a bug. Do you
> > > have a
> > > > > way
> > > > > > > to reproduce this?
> > > > > > >
> > > > > > >
> > > > > > > Justin
> > > > > > >
> > > > > > > On Thu, Feb 17, 2022 at 3:17 PM Tim Jones <t...@abcwxy.com> wrote:
> > > > > > >
> > > > > > > > This seems to appear on larger messages only - I am getting a
> > > > warning
> > > > > > > when
> > > > > > > > calling getDataBuffer (2.20.0 Artemis Client).  Curious if there
> > > is
> > > > > > > > something I may be missing - or if this is completely ignorable?
> > > > > > Thanks -
> > > > > > > > Tim
> > > > > > > >
> > > > > > > > 2022-02-17T21:06:11,908+01:00 [Thread-2
> > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > > WARN o.a.a.a.c.m.i.CoreMessage [Thread-2
> > > > > > > (ActiveMQ-client-global-threads)]
> > > > > > > > readerIndex(270740) + length(270740) exceeds 
> > > > > > > > writerIndex(271572):
> > > > > > > >
> > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > > java.lang.IndexOutOfBoundsException: readerIndex(270740) +
> > > > > > length(270740)
> > > > > > > > exceeds writerIndex(271572):
> > > > > > > >
> > > > UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx:
> > > > > > > > 270740, widx: 271572, cap: 271572)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428)
> > > > > > > > at
> > > > > io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:937)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.client.impl.ClientMessageImpl$DecodingContext.readInto(ClientMessageImpl.java:407)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getLargeMessageBuffer(CoreMessage.java:264)
> > > > > > > > at
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > org.apache.activemq.artemis.core.message.impl.CoreMessage.getDataBuffer(CoreMessage.java:241)
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > --
> > > > > Clebert Suconic
> > > > >
> > > >
> > > --
> > > Clebert Suconic
> > >
>
>
>
> --
> Clebert Suconic



-- 
Clebert Suconic

Reply via email to