Hi, Just to clarify, are consumers meant to be able to remove the message for lvq when non destructive is true?
I took a test: for the lvq of which non destructive is true, the producer sent a message and consumer received it. After broker restarted, the message was removed. <michael.andre.pea...@me.com.invalid> 于2019年7月5日周五 下午1:33写道: > For lvq when non destructive is false consumers are meant to be able to > remove the message by acking. > > > > > Use case there is coalesced price updates where you just care about > lastest value, but the updates are faster than a consumer may deal with. > > > > > Behaviour is as expected. > > > > > Get Outlook for Android > > > > > > > > On Fri, Jul 5, 2019 at 2:53 AM +0100, "yw yw" <wy96...@gmail.com> wrote: > > > > > > > > > > > Yes, the last value always wins. > > The document says "Another common pattern is to have queue "browsers" which > send all messages to the browser, but do not prevent other consumers from > receiving the messages, and do not remove them from the queue when the > browser is done with them. Such a browser is an instance of a > "non-destructive" consumer." The fact is we don't remove them in memory but > we append records in journal. When the consumer acks the message and > reference count of message is decremented to zero, the ack record and > delete message record are written into journal. If broker restarts, the > last value is lost. Not sure it's what we expect? > > > michael.andre.pearce 于2019年7月4日周四 > 下午11:30写道: > > > Non destructive is so a consumer doesnt ack the message. Essentially > > meaning the last value is always kept in the lvq.When a new messages > > replaces and old then it needs the old one is acked to it is removed, > this > > is the point.Sent from my Samsung Galaxy smartphone. > > -------- Original message --------From: yw yw Date: > > 04/07/2019 08:42 (GMT+00:00) To: users@activemq.apache.org Subject: > Re: > > AMQ 224038 on Last Value Queue Hi,We have encountered the same problems > > these days.Right now the JMSNonDestructiveTest passes successfully bcs > > persistence isdisabled which means there are no journal operations. If we > > enablepersistence, the test fails in testNonDestructiveLVQTombstone().The > > reproduce step is the same with what Matt said:1. First send a message.2. > > Then receive the message. The key point is here: We ack this message > > anddelete the message so the record is removed in "records" map.3. Last > > send another message. The exception occurs > > > below:[Thread-13(ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@77a57272 > )]15:01:06,990 > > ERROR [org.apache.activemq.artemis.core.server] AMQ224038:Failed to ack > old > > reference: java.lang.IllegalStateException: Cannot findadd info 446 on > > compactor or current > > > recordsatorg.apache.activemq.artemis.core.journal.impl.JournalImpl.checkKnownRecordID(JournalImpl.java:1081)[:]atorg.apache.activemq.artemis.core.journal.impl.JournalImpl.appendUpdateRecord(JournalImpl.java:888)[:]atorg.apache.activemq.artemis.core.journal.Journal.appendUpdateRecord(Journal.java:98)[:]atorg.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager.storeAcknowledge(AbstractJournalStorageManager.java:425)[:]atorg.apache.activemq.artemis.core.server.impl.QueueImpl.acknowledge(QueueImpl.java:1539)[:]atorg.apache.activemq.artemis.core.server.impl.LastValueQueue.acknowledge(LastValueQueue.java:193)[:]atorg.apache.activemq.artemis.core.server.impl.MessageReferenceImpl.acknowledge(MessageReferenceImpl.java:235)[:]atorg.apache.activemq.artemis.core.server.impl.LastValueQueue.replaceLVQMessage(LastValueQueue.java:172)[:]atorg.apache.activemq.artemis.core.server.impl.LastValueQueue.addTail(LastValueQueue.java:107)[:]atorg.apache.activemq.artemis.core.server.impl.RoutingContextImpl.internalprocessReferences(RoutingContextImpl.java:164)[:]atorg.apache.activemq.artemis.core.server.impl.RoutingContextImpl.processReferences(RoutingContextImpl.java:159)[:]atorg.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl$2.done(PostOfficeImpl.java:1378)[:]atorg.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl$1.run(OperationContextImpl.java:244)[:]atorg.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)[:]atorg.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)[:]atorg.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66)[:]atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[rt.jar:1.8.0_102]atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[rt.jar:1.8.0_102]atorg.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)[:]I'm > > confused with the current lastValueQueue design. Why do we acknowledgethe > > message(i mean in normal ack reason) if the queue is non destructive?What > > if we acknowledged messages and messages were deleted in the journal,then > > broker restarted? I assume these messages would be lost, seems againstthe > > non destructive principal? It seems like we don't need to > reallyacknowledge > > messages except for KILLED/EXPIRED/ REPLACED reasons for thenon > destructive > > queue?Justin Bertram 于2019年7月4日周四 上午11:27写道:> There > > are a couple of tests in the test suite which use both last-value and> > > non-destructive queue attributes (e.g. in> > > > org.apache.activemq.artemis.tests.integration.amqp.JMSNonDestructiveTest).> > > These work without issue. I also took the code you pasted and tried to> > > reproduce the failure, but everything appears to be working as expected.> > > For what it's worth, I'm testing on the tip of the master branch (i.e.> > > 2.10.0-SNAPSHOT).>> Would it be possible for you to provide more details > > about how to reproduce> this failure? Perhaps some simple modifications > to > > the last-value-queue> example distributed with the broker would > suffice.>>> > > Justin>> On Wed, Jul 3, 2019 at 4:44 PM mschmeiser > > wrote:>> > Hello> >> > I am getting an error "AMQ224039: Failed to ack > old > > reference:> > java.lang.IllegalStateException: Cannot find add info 5698 > on > > compactor> or> > current records" after every publish of a message to a > > Last Value Queue> > except the first time.> >> > I am on ActiveMQ Artemis > > 2.8.1. I know there's a more recent version but> > checked the release > > notes and none of the bug fixes looked like they'd be> > relevant to > this.> > > >> > I'm not working with a deployment-ready system yet, this is very > > early> > stage> > prototyping. I have a single broker with the basic > > configuration, I'm> > basically running things 'out of the box'. The java > > code that I'm using> > looks something like:> >> > ***> >> > > ActiveMQQueue > > destination = ActiveMQQueue.createQueue(subject +> > > > "?last-value-key=LVK&non-destructive=true");> >> > MessageProducer > producer > > = session.createProducer(destination);> >> > TextMessage message = > > session.createTextMessage("test object");> > > > message.setStringProperty("LVK", "foo");> >> > Thread.sleep(500);> >> > > > session.createConsumer(destination, "LVK = 'foo'").setMessageListener(m> > > ->> > processMessage(m));> >> > Thread.sleep(500);> >> > message = > > session.createTextMessage("test object 2");> > > > message.setStringProperty("LVK", "foo");> >> > producer.send(message);> > >> > > > Thread.sleep(500);> >> > session.createConsumer(destination, "LVK = > > 'foo'").setMessageListener(m> ->> > processMessage(m));> >> > ***> >> > > The > > first time this code is run, I only get the error when the message is> > > > sent the second time. After that, the error happens twice per run. If I> > > go> > into the broker console and purge the queue, then the first message > > send> > works fine, error is thrown every subsequent time.> >> > The > stack > > trace (I'm not able to copy/paste from my development> > environment)> > > > looks like:> >> > JournalImpl.checkKnownRecordID(JournalImpl.java:1074)> > > > > JournalImpl.appendUpdateRecord(JournalImpl.java:881)> > > > Journal.appendUpdateRecord(Journal.java:98)> >> >> > > > AbstractJournalStorageManager.storeAcknowledge(AbstractJournalStorageManager.java:425)> > > > QueueImpl.acknowledge(QueueImpl.java:1533)> > > > LastValueQueue.acknowledge(MessageReferenceImpl.java:235)> > > > MessageReferenceImpl.acknowledge(MessageReferenceImpl.java:235)> > > > LastValueQueue.replaceLVQMessage(LastValueQueue.java:172)> > > > LastValueQueue.addTail(LastValueQueue.java:107)> > ...> >> > Tracing it > > back that far, it looks like the failed acknowledge shouldn't> > cause > > major problems for the functionality of the system. The code is all> > > > executing as you'd expect it to and the queue on the broker always has> > > the> > expected message in it. The error is still a concern though simply > > as a> > matter of performance - scalability is a concern for me and the > > overhead> of> > logging a bunch of errors would be a problem.> >> > Let > me > > know if there's any other information I can provide that could> help> > > > diagnose this issue.> >> > Thanks,> > Matt> >> >> >> > --> > Sent from:> > > > > http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html> >> > > > > > >